IMPALA-4835: Part 1: simplify I/O mgr mem mgmt and cancellation

In preparation for switching the I/O mgr to the buffer pool, this
removes and cleans up a lot of code so that the switchover patch starts
from a cleaner slate.

* Remove the free buffer cache (which will be replaced by buffer pool's
  own caching).
* Make memory limit exceeded error checking synchronous (in anticipation
  of having to propagate buffer pool errors synchronously).
* Simplify error propagation - remove the (ineffectual) code that
  enqueued BufferDescriptors containing error statuses.
* Document locking scheme better in a few places, make it part of the
  function signature when it seemed reasonable.
* Move ReturnBuffer() to ScanRange, because it is intrinsically
  connected with the lifecycle of a scan range.
* Separate external ReturnBuffer() and internal CleanUpBuffer()
  interfaces - previously callers of ReturnBuffer() were fudging
  the num_buffers_in_reader accounting to make the external interface work.
* Eliminate redundant state in ScanRange: 'eosr_returned_' and
  'is_cancelled_'.
* Clarify the logic around calling Close() for the last
  BufferDescriptor.
  -> There appeared to be an implicit assumption that buffers would be
     freed in the order they were returned from the scan range, so that
     the "eos" buffer was returned last. Instead just count the number
     of outstanding buffers to detect the last one.
  -> Touching the is_cancelled_ field without holding a lock was hard to
     reason about - violated locking rules and it was unclear that it
     was race-free.
* Remove DiskIoMgr::Read() to simplify the interface. It is trivial to
  inline at the callsites.

This will probably regress performance somewhat because of the cache
removal, so my plan is to merge it around the same time as switching
the I/O mgr to allocate from the buffer pool. I'm keeping the patches
separate to make reviewing easier.

Testing:
* Ran exhaustive tests
* Ran the disk-io-mgr-stress-test overnight

Change-Id: If5cb42437d11c13bc4a55c3ab426b66777332bd1
Reviewed-on: http://gerrit.cloudera.org:8080/8414
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Tim Armstrong <tarmstr...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/65680dc4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/65680dc4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/65680dc4

Branch: refs/heads/master
Commit: 65680dc42107db4ff2273c635cedf83d20f0ea94
Parents: 7ce519f
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Sun Oct 29 12:38:47 2017 -0700
Committer: Tim Armstrong <tarmstr...@cloudera.com>
Committed: Fri Feb 23 04:17:41 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc     |   5 +-
 be/src/exec/hdfs-scan-node-base.cc      |  31 +-
 be/src/exec/hdfs-scan-node.cc           |   5 +-
 be/src/exec/scanner-context.cc          |   9 +-
 be/src/runtime/exec-env.cc              |   7 +-
 be/src/runtime/io/disk-io-mgr-stress.cc |  20 +-
 be/src/runtime/io/disk-io-mgr-test.cc   | 171 +++------
 be/src/runtime/io/disk-io-mgr.cc        | 504 +++++----------------------
 be/src/runtime/io/disk-io-mgr.h         | 124 +------
 be/src/runtime/io/request-context.cc    | 148 ++++++--
 be/src/runtime/io/request-context.h     | 187 ++++++----
 be/src/runtime/io/request-ranges.h      | 107 +++---
 be/src/runtime/io/scan-range.cc         | 142 ++++----
 be/src/runtime/mem-tracker.h            |   1 -
 be/src/runtime/test-env.cc              |   2 +-
 be/src/runtime/tmp-file-mgr-test.cc     |   3 +-
 be/src/runtime/tmp-file-mgr.cc          |   2 +-
 be/src/util/impalad-metrics.cc          |  13 +-
 be/src/util/impalad-metrics.h           |   9 -
 19 files changed, 579 insertions(+), 911 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index e279369..bb3d091 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -1440,11 +1440,12 @@ Status HdfsParquetScanner::ProcessFooter() {
 
     unique_ptr<BufferDescriptor> io_buffer;
     RETURN_IF_ERROR(
-        io_mgr->Read(scan_node_->reader_context(), metadata_range, 
&io_buffer));
+        io_mgr->AddScanRange(scan_node_->reader_context(), metadata_range, 
true));
+    RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer));
     DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
     DCHECK_EQ(io_buffer->len(), metadata_size);
     DCHECK(io_buffer->eosr());
-    io_mgr->ReturnBuffer(move(io_buffer));
+    metadata_range->ReturnBuffer(move(io_buffer));
   }
 
   // Deserialize file header

http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index b40fb7e..af2488d 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -346,13 +346,10 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   num_scanner_threads_started_counter_ =
       ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
 
-  runtime_state_->io_mgr()->set_bytes_read_counter(
-      reader_context_.get(), bytes_read_counter());
-  runtime_state_->io_mgr()->set_read_timer(reader_context_.get(), 
read_timer());
-  runtime_state_->io_mgr()->set_active_read_thread_counter(
-      reader_context_.get(), &active_hdfs_read_thread_counter_);
-  runtime_state_->io_mgr()->set_disks_access_bitmap(
-      reader_context_.get(), &disks_accessed_bitmap_);
+  reader_context_->set_bytes_read_counter(bytes_read_counter());
+  reader_context_->set_read_timer(read_timer());
+  
reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_);
+  reader_context_->set_disks_accessed_bitmap(&disks_accessed_bitmap_);
 
   average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
       AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
@@ -818,20 +815,14 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
       Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
 
   if (reader_context_ != nullptr) {
-    bytes_read_local_->Set(
-        runtime_state_->io_mgr()->bytes_read_local(reader_context_.get()));
-    bytes_read_short_circuit_->Set(
-        
runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_.get()));
-    bytes_read_dn_cache_->Set(
-        runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_.get()));
-    num_remote_ranges_->Set(static_cast<int64_t>(
-        runtime_state_->io_mgr()->num_remote_ranges(reader_context_.get())));
-    unexpected_remote_bytes_->Set(
-        
runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_.get()));
-    cached_file_handles_hit_count_->Set(
-        
runtime_state_->io_mgr()->cached_file_handles_hit_count(reader_context_.get()));
+    bytes_read_local_->Set(reader_context_->bytes_read_local());
+    
bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit());
+    bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache());
+    num_remote_ranges_->Set(reader_context_->num_remote_ranges());
+    unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes());
+    
cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count());
     cached_file_handles_miss_count_->Set(
-        
runtime_state_->io_mgr()->cached_file_handles_miss_count(reader_context_.get()));
+        reader_context_->cached_file_handles_miss_count());
 
     if (unexpected_remote_bytes_->value() >= 
UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
       runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(

http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 710a8af..b32a743 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -26,6 +26,7 @@
 #include "exec/scanner-context.h"
 #include "runtime/descriptors.h"
 #include "runtime/fragment-instance-state.h"
+#include "runtime/io/request-context.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
@@ -549,9 +550,7 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
 void HdfsScanNode::SetDoneInternal() {
   if (done_) return;
   done_ = true;
-  if (reader_context_ != nullptr) {
-    runtime_state_->io_mgr()->CancelContext(reader_context_.get());
-  }
+  if (reader_context_ != nullptr) reader_context_->Cancel();
   materialized_row_batches_->Shutdown();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index abdde07..79e7a85 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -143,8 +143,10 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
     ScanRange* range = parent_->scan_node_->AllocateScanRange(
         scan_range_->fs(), filename(), read_past_buffer_size, offset, 
partition_id,
         scan_range_->disk_id(), false, BufferOpts::Uncached());
-    RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
-        parent_->scan_node_->reader_context(), range, &io_buffer_));
+    RETURN_IF_ERROR(parent_->state_->io_mgr()->AddScanRange(
+        parent_->scan_node_->reader_context(), range, true));
+    RETURN_IF_ERROR(range->GetNext(&io_buffer_));
+    DCHECK(io_buffer_->eosr());
   }
 
   DCHECK(io_buffer_ != nullptr);
@@ -324,7 +326,8 @@ Status ScannerContext::Stream::CopyIoToBoundary(int64_t 
num_bytes) {
 
 void ScannerContext::Stream::ReturnIoBuffer() {
   DCHECK(io_buffer_ != nullptr);
-  ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffer_));
+  ScanRange* range = io_buffer_->scan_range();
+  range->ReturnBuffer(move(io_buffer_));
   io_buffer_pos_ = nullptr;
   io_buffer_bytes_left_ = 0;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 8a51d88..73961de 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -36,9 +36,9 @@
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
-#include "runtime/io/disk-io-mgr.h"
 #include "runtime/hbase-table-factory.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/lib-cache.h"
 #include "runtime/mem-tracker.h"
@@ -350,10 +350,7 @@ Status ExecEnv::Init() {
   LOG(INFO) << "Buffer pool limit: "
             << PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);
 
-  RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get()));
-
-  mem_tracker_->AddGcFunction(
-      [this](int64_t bytes_to_free) { 
disk_io_mgr_->GcIoBuffers(bytes_to_free); });
+  RETURN_IF_ERROR(disk_io_mgr_->Init());
 
   // Start services in order to ensure that dependencies between them are met
   if (enable_webserver_) {

http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc 
b/be/src/runtime/io/disk-io-mgr-stress.cc
index 8815357..cfe71ab 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -77,7 +77,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int 
num_threads_per_disk,
 
   io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, 
num_threads_per_disk,
       MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
-  Status status = io_mgr_->Init(&mem_tracker_);
+  Status status = io_mgr_->Init();
   CHECK(status.ok());
 
   // Initialize some data files.  It doesn't really matter how many there are.
@@ -137,7 +137,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
         // Copy the bytes from this read into the result buffer.
         memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
-        io_mgr_->ReturnBuffer(move(buffer));
+        range->ReturnBuffer(move(buffer));
         bytes_read += len;
 
         CHECK_GE(bytes_read, 0);
@@ -159,6 +159,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
     // Unregister the old client and get a new one
     unique_lock<mutex> lock(client->lock);
     io_mgr_->UnregisterContext(client->reader.get());
+    client->reader.reset();
     NewClient(client_id);
   }
 
@@ -170,11 +171,9 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 // Cancel a random reader
 void DiskIoMgrStress::CancelRandomReader() {
   if (!includes_cancellation_) return;
-
-  int rand_client = rand() % num_clients_;
-
-  unique_lock<mutex> lock(clients_[rand_client].lock);
-  io_mgr_->CancelContext(clients_[rand_client].reader.get());
+  Client* rand_client = &clients_[rand() % num_clients_];
+  unique_lock<mutex> lock(rand_client->lock);
+  rand_client->reader->Cancel();
 }
 
 void DiskIoMgrStress::Run(int sec) {
@@ -199,10 +198,12 @@ void DiskIoMgrStress::Run(int sec) {
 
   for (int i = 0; i < num_clients_; ++i) {
     unique_lock<mutex> lock(clients_[i].lock);
-    if (clients_[i].reader != NULL) 
io_mgr_->CancelContext(clients_[i].reader.get());
+    if (clients_[i].reader != NULL) clients_[i].reader->Cancel();
   }
-
   readers_.join_all();
+
+  for (unique_ptr<MemTracker>& mem_tracker : client_mem_trackers_) 
mem_tracker->Close();
+  mem_tracker_.Close();
 }
 
 // Initialize a client to read one of the files at random.  The scan ranges are
@@ -240,6 +241,7 @@ void DiskIoMgrStress::NewClient(int i) {
     assigned_len += range_len;
   }
 
+  if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
   client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
   client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
   Status status = io_mgr_->AddScanRanges(client.reader.get(), 
client.scan_ranges);

http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc 
b/be/src/runtime/io/disk-io-mgr-test.cc
index b03ec31..e099285 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -22,11 +22,13 @@
 
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
-#include "runtime/io/request-context.h"
 #include "runtime/io/disk-io-mgr-stress.h"
 #include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/request-context.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/test-env.h"
 #include "runtime/thread-resource-mgr.h"
+#include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 #include "util/condition-variable.h"
 #include "util/cpu-info.h"
@@ -49,7 +51,10 @@ namespace io {
 class DiskIoMgrTest : public testing::Test {
  public:
 
-  virtual void SetUp() {}
+  virtual void SetUp() {
+    test_env_.reset(new TestEnv);
+    ASSERT_OK(test_env_->Init());
+  }
 
   virtual void TearDown() {
     pool_.Clear();
@@ -117,13 +122,14 @@ class DiskIoMgrTest : public testing::Test {
   static void ValidateSyncRead(DiskIoMgr* io_mgr, RequestContext* reader,
       ScanRange* range, const char* expected, int expected_len = -1) {
     unique_ptr<BufferDescriptor> buffer;
-    ASSERT_OK(io_mgr->Read(reader, range, &buffer));
+    ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
+    ASSERT_OK(range->GetNext(&buffer));
     ASSERT_TRUE(buffer != nullptr);
     EXPECT_EQ(buffer->len(), range->len());
     if (expected_len < 0) expected_len = strlen(expected);
     int cmp = memcmp(buffer->buffer(), expected, expected_len);
     EXPECT_TRUE(cmp == 0);
-    io_mgr->ReturnBuffer(move(buffer));
+    range->ReturnBuffer(move(buffer));
   }
 
   static void ValidateScanRange(DiskIoMgr* io_mgr, ScanRange* range,
@@ -136,13 +142,13 @@ class DiskIoMgrTest : public testing::Test {
       Status status = range->GetNext(&buffer);
       ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
       if (buffer == nullptr || !status.ok()) {
-        if (buffer != nullptr) io_mgr->ReturnBuffer(move(buffer));
+        if (buffer != nullptr) range->ReturnBuffer(move(buffer));
         break;
       }
       ASSERT_LE(buffer->len(), expected_len);
       memcpy(result + range->offset() + buffer->scan_range_offset(),
           buffer->buffer(), buffer->len());
-      io_mgr->ReturnBuffer(move(buffer));
+      range->ReturnBuffer(move(buffer));
     }
     ValidateEmptyOrCorrect(expected, result, expected_len);
   }
@@ -177,6 +183,8 @@ class DiskIoMgrTest : public testing::Test {
     return range;
   }
 
+  boost::scoped_ptr<TestEnv> test_env_;
+
   ObjectPool pool_;
 
   mutex written_mutex_;
@@ -204,14 +212,13 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
 
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
-  ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
-  unique_ptr<RequestContext> reader =
-      read_io_mgr->RegisterContext(&reader_mem_tracker);
+  ASSERT_OK(read_io_mgr->Init());
+  unique_ptr<RequestContext> reader = 
read_io_mgr->RegisterContext(&reader_mem_tracker);
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; 
++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.Clear(); // Destroy scan ranges from previous iterations.
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 
1, 10);
-      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      ASSERT_OK(io_mgr.Init());
       unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
       for (int i = 0; i < num_ranges; ++i) {
         int32_t* data = pool_.Add(new int32_t);
@@ -247,7 +254,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   num_ranges_written_ = 0;
   string tmp_file = "/non-existent/file.txt";
   DiskIoMgr io_mgr(1, 1, 1, 1, 10);
-  ASSERT_OK(io_mgr.Init(&mem_tracker));
+  ASSERT_OK(io_mgr.Init());
   unique_ptr<RequestContext> writer = io_mgr.RegisterContext(nullptr);
   int32_t* data = pool_.Add(new int32_t);
   *data = rand();
@@ -308,19 +315,18 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
 
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
-  ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
-  unique_ptr<RequestContext> reader =
-      read_io_mgr->RegisterContext(&reader_mem_tracker);
+  ASSERT_OK(read_io_mgr->Init());
+  unique_ptr<RequestContext> reader = 
read_io_mgr->RegisterContext(&reader_mem_tracker);
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; 
++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.Clear(); // Destroy scan ranges from previous iterations.
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 
1, 10);
-      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      ASSERT_OK(io_mgr.Init());
       unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
       Status validate_status = Status::OK();
       for (int i = 0; i < num_ranges; ++i) {
         if (i == num_ranges_before_cancel) {
-          io_mgr.CancelContext(writer.get());
+          writer->Cancel();
           validate_status = Status::CANCELLED;
         }
         int32_t* data = pool_.Add(new int32_t);
@@ -375,10 +381,9 @@ TEST_F(DiskIoMgrTest, SingleReader) {
         if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
         DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 
num_threads_per_disk, 1, 1);
 
-        ASSERT_OK(io_mgr.Init(&mem_tracker));
+        ASSERT_OK(io_mgr.Init());
         MemTracker reader_mem_tracker;
-        unique_ptr<RequestContext> reader =
-            io_mgr.RegisterContext(&reader_mem_tracker);
+        unique_ptr<RequestContext> reader = 
io_mgr.RegisterContext(&reader_mem_tracker);
 
         vector<ScanRange*> ranges;
         for (int i = 0; i < len; ++i) {
@@ -426,10 +431,9 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
       if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 
1, 1);
 
-      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      ASSERT_OK(io_mgr.Init());
       MemTracker reader_mem_tracker;
-      unique_ptr<RequestContext> reader =
-          io_mgr.RegisterContext(&reader_mem_tracker);
+      unique_ptr<RequestContext> reader = 
io_mgr.RegisterContext(&reader_mem_tracker);
 
       vector<ScanRange*> ranges_first_half;
       vector<ScanRange*> ranges_second_half;
@@ -496,11 +500,9 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk,
           MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
-      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      ASSERT_OK(io_mgr.Init());
       MemTracker reader_mem_tracker;
-      unique_ptr<RequestContext> reader =
-          io_mgr.RegisterContext(&reader_mem_tracker);
-
+      unique_ptr<RequestContext> reader = 
io_mgr.RegisterContext(&reader_mem_tracker);
       ScanRange* complete_range =
           InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
 
@@ -563,10 +565,9 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
       if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 
1, 1);
 
-      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      ASSERT_OK(io_mgr.Init());
       MemTracker reader_mem_tracker;
-      unique_ptr<RequestContext> reader =
-          io_mgr.RegisterContext(&reader_mem_tracker);
+      unique_ptr<RequestContext> reader = 
io_mgr.RegisterContext(&reader_mem_tracker);
 
       vector<ScanRange*> ranges;
       for (int i = 0; i < len; ++i) {
@@ -591,11 +592,11 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
             strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
       }
 
-      io_mgr.CancelContext(reader.get());
+      reader->Cancel();
       sched_yield();
 
       threads.join_all();
-      EXPECT_TRUE(io_mgr.context_status(reader.get()).IsCancelled());
+      EXPECT_TRUE(reader->IsCancelled());
       io_mgr.UnregisterContext(reader.get());
       EXPECT_EQ(reader_mem_tracker.consumption(), 0);
     }
@@ -621,7 +622,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
     DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
-    ASSERT_OK(io_mgr.Init(&root_mem_tracker));
+    ASSERT_OK(io_mgr.Init());
     MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
     unique_ptr<RequestContext> reader = 
io_mgr.RegisterContext(&reader_mem_tracker);
 
@@ -632,12 +633,13 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
     // Don't return buffers to force memory pressure
-    vector<unique_ptr<BufferDescriptor>> buffers;
+    vector<pair<ScanRange*, unique_ptr<BufferDescriptor>>> buffers;
 
     AtomicInt32 num_ranges_processed;
     ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), 
Status::MemLimitExceeded(),
         1, &num_ranges_processed);
 
+    bool hit_mem_limit_exceeded = false;
     char result[strlen(data) + 1];
     // Keep reading new ranges without returning buffers. This forces us
     // to go over the limit eventually.
@@ -646,25 +648,27 @@ TEST_F(DiskIoMgrTest, MemLimits) {
       ScanRange* range = nullptr;
       Status status = io_mgr.GetNextRange(reader.get(), &range);
       ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
+      hit_mem_limit_exceeded |= status.IsMemLimitExceeded();
       if (range == nullptr) break;
 
       while (true) {
         unique_ptr<BufferDescriptor> buffer;
         Status status = range->GetNext(&buffer);
         ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
+        hit_mem_limit_exceeded |= status.IsMemLimitExceeded();
         if (buffer == nullptr) break;
         memcpy(result + range->offset() + buffer->scan_range_offset(),
             buffer->buffer(), buffer->len());
-        buffers.push_back(move(buffer));
+        buffers.emplace_back(range, move(buffer));
       }
       ValidateEmptyOrCorrect(data, result, strlen(data));
     }
 
     for (int i = 0; i < buffers.size(); ++i) {
-      io_mgr.ReturnBuffer(move(buffers[i]));
+      buffers[i].first->ReturnBuffer(move(buffers[i].second));
     }
 
-    EXPECT_TRUE(io_mgr.context_status(reader.get()).IsMemLimitExceeded());
+    EXPECT_TRUE(hit_mem_limit_exceeded) << "Should have run out of memory";
     io_mgr.UnregisterContext(reader.get());
     EXPECT_EQ(reader_mem_tracker.consumption(), 0);
   }
@@ -689,7 +693,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
   {
     DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
-    ASSERT_OK(io_mgr.Init(&mem_tracker));
+    ASSERT_OK(io_mgr.Init());
     MemTracker reader_mem_tracker;
     unique_ptr<RequestContext> reader = 
io_mgr.RegisterContext(&reader_mem_tracker);
 
@@ -762,7 +766,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
       for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
         DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, 
MIN_BUFFER_SIZE,
             MAX_BUFFER_SIZE);
-        ASSERT_OK(io_mgr.Init(&mem_tracker));
+        ASSERT_OK(io_mgr.Init());
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
           contexts[file_index] = io_mgr.RegisterContext(&mem_tracker);
         }
@@ -809,7 +813,6 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
             threads.join_all();
           } // for (int context_index
         } // while (read_offset < file_size)
-
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
           io_mgr.UnregisterContext(contexts[file_index].get());
         }
@@ -874,7 +877,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
 
         DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, 
MIN_BUFFER_SIZE,
             MAX_BUFFER_SIZE);
-        EXPECT_OK(io_mgr.Init(&mem_tracker));
+        ASSERT_OK(io_mgr.Init());
 
         for (int i = 0; i < NUM_READERS; ++i) {
           readers[i] = io_mgr.RegisterContext(&mem_tracker);
@@ -915,76 +918,6 @@ TEST_F(DiskIoMgrTest, StressTest) {
   test.Run(2); // In seconds
 }
 
-TEST_F(DiskIoMgrTest, Buffers) {
-  // Test default min/max buffer size
-  int min_buffer_size = 1024;
-  int max_buffer_size = 8 * 1024 * 1024; // 8 MB
-  MemTracker root_mem_tracker(max_buffer_size * 2);
-
-  DiskIoMgr io_mgr(1, 1, 1, min_buffer_size, max_buffer_size);
-  ASSERT_OK(io_mgr.Init(&root_mem_tracker));
-  ASSERT_EQ(root_mem_tracker.consumption(), 0);
-
-  MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
-  unique_ptr<RequestContext> reader;
-  reader = io_mgr.RegisterContext(&reader_mem_tracker);
-
-  ScanRange* dummy_range = InitRange("dummy", 0, 0, 0, 0);
-
-  // buffer length should be rounded up to min buffer size
-  int64_t buffer_len = 1;
-  unique_ptr<BufferDescriptor> buffer_desc;
-  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
-  EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
-  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
-  io_mgr.FreeBufferMemory(buffer_desc.get());
-  io_mgr.ReturnBuffer(move(buffer_desc));
-  EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
-
-  // reuse buffer
-  buffer_len = min_buffer_size;
-  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
-  EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
-  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
-  io_mgr.FreeBufferMemory(buffer_desc.get());
-  io_mgr.ReturnBuffer(move(buffer_desc));
-  EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
-
-  // bump up to next buffer size
-  buffer_len = min_buffer_size + 1;
-  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
-  EXPECT_EQ(min_buffer_size * 2, buffer_desc->buffer_len());
-  EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
-  EXPECT_EQ(min_buffer_size * 3, root_mem_tracker.consumption());
-
-  // gc unused buffer
-  io_mgr.GcIoBuffers();
-  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
-  EXPECT_EQ(min_buffer_size * 2, root_mem_tracker.consumption());
-
-  io_mgr.FreeBufferMemory(buffer_desc.get());
-  io_mgr.ReturnBuffer(move(buffer_desc));
-
-  // max buffer size
-  buffer_len = max_buffer_size;
-  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
-  EXPECT_EQ(max_buffer_size, buffer_desc->buffer_len());
-  EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
-  io_mgr.FreeBufferMemory(buffer_desc.get());
-  io_mgr.ReturnBuffer(move(buffer_desc));
-  EXPECT_EQ(min_buffer_size * 2 + max_buffer_size, 
root_mem_tracker.consumption());
-
-  // gc buffers
-  io_mgr.GcIoBuffers();
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 0);
-  EXPECT_EQ(root_mem_tracker.consumption(), 0);
-  io_mgr.UnregisterContext(reader.get());
-}
-
 // IMPALA-2366: handle partial read where range goes past end of file.
 TEST_F(DiskIoMgrTest, PartialRead) {
   MemTracker mem_tracker(LARGE_MEM_LIMIT);
@@ -1000,7 +933,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
 
   scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
 
-  ASSERT_OK(io_mgr->Init(&mem_tracker));
+  ASSERT_OK(io_mgr->Init());
   MemTracker reader_mem_tracker;
   unique_ptr<RequestContext> reader;
   reader = io_mgr->RegisterContext(&reader_mem_tracker);
@@ -1008,11 +941,12 @@ TEST_F(DiskIoMgrTest, PartialRead) {
   // We should not read past the end of file.
   ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
   unique_ptr<BufferDescriptor> buffer;
-  ASSERT_OK(io_mgr->Read(reader.get(), range, &buffer));
+  ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
+  ASSERT_OK(range->GetNext(&buffer));
   ASSERT_TRUE(buffer->eosr());
   ASSERT_EQ(len, buffer->len());
   ASSERT_TRUE(memcmp(buffer->buffer(), data, len) == 0);
-  io_mgr->ReturnBuffer(move(buffer));
+  range->ReturnBuffer(move(buffer));
 
   io_mgr->UnregisterContext(reader.get());
   pool_.Clear();
@@ -1032,7 +966,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
 
   scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
 
-  ASSERT_OK(io_mgr->Init(&mem_tracker));
+  ASSERT_OK(io_mgr->Init());
   // Reader doesn't need to provide mem tracker if it's providing buffers.
   MemTracker* reader_mem_tracker = nullptr;
   unique_ptr<RequestContext> reader;
@@ -1055,7 +989,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
 
     // DiskIoMgr should not have allocated memory.
     EXPECT_EQ(mem_tracker.consumption(), 0);
-    io_mgr->ReturnBuffer(move(io_buffer));
+    range->ReturnBuffer(move(io_buffer));
   }
 
   io_mgr->UnregisterContext(reader.get());
@@ -1072,7 +1006,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
 
   scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, SCAN_LEN, SCAN_LEN));
 
-  ASSERT_OK(io_mgr->Init(&mem_tracker));
+  ASSERT_OK(io_mgr->Init());
   // Reader doesn't need to provide mem tracker if it's providing buffers.
   MemTracker* reader_mem_tracker = nullptr;
   unique_ptr<RequestContext> reader;
@@ -1086,7 +1020,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
 
     /// Also test the cancellation path. Run multiple iterations since it is 
racy whether
     /// the read fails before the cancellation.
-    if (i >= 1) io_mgr->CancelContext(reader.get());
+    if (i >= 1) reader->Cancel();
 
     unique_ptr<BufferDescriptor> io_buffer;
     ASSERT_FALSE(range->GetNext(&io_buffer).ok());
@@ -1114,7 +1048,7 @@ TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
   const int num_io_threads_per_rotational_or_ssd = 2;
   DiskIoMgr io_mgr(1, num_io_threads_per_rotational_or_ssd,
       num_io_threads_per_rotational_or_ssd, 1, 10);
-  ASSERT_OK(io_mgr.Init(&mem_tracker));
+  ASSERT_OK(io_mgr.Init());
   const int num_io_threads = io_mgr.disk_thread_group_.Size();
   ASSERT_TRUE(num_io_threads ==
       num_io_threads_per_rotational_or_ssd + num_io_threads_for_remote_disks);
@@ -1125,5 +1059,6 @@ TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index f3d69f1..8c00ef8 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -157,34 +157,6 @@ string DiskIoMgr::DebugString() {
   return ss.str();
 }
 
-BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
-    RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
-    int64_t buffer_len, MemTracker* mem_tracker)
-  : io_mgr_(io_mgr),
-    reader_(reader),
-    mem_tracker_(mem_tracker),
-    scan_range_(scan_range),
-    buffer_(buffer),
-    buffer_len_(buffer_len) {
-  DCHECK(io_mgr != nullptr);
-  DCHECK(scan_range != nullptr);
-  DCHECK(buffer != nullptr);
-  DCHECK_GE(buffer_len, 0);
-  DCHECK_NE(scan_range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER,
-      mem_tracker == nullptr);
-}
-
-void BufferDescriptor::TransferOwnership(MemTracker* dst) {
-  DCHECK(dst != nullptr);
-  DCHECK(!is_client_buffer());
-  // Memory of cached buffers is not tracked against a tracker.
-  if (is_cached()) return;
-  DCHECK(mem_tracker_ != nullptr);
-  dst->Consume(buffer_len_);
-  mem_tracker_->Release(buffer_len_);
-  mem_tracker_ = dst;
-}
-
 WriteRange::WriteRange(
     const string& file, int64_t file_offset, int disk_id, WriteDoneCallback 
callback)
   : RequestRange(RequestType::WRITE), callback_(callback) {
@@ -235,8 +207,6 @@ DiskIoMgr::DiskIoMgr() :
         FLAGS_num_file_handle_cache_partitions,
         FLAGS_unused_file_handle_timeout_sec) {
   DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size);
-  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, 
min_buffer_size_);
-  free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   int num_local_disks = DiskInfo::num_disks();
   if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) {
     LOG(WARNING) << "Number of disks specified should be between 0 and the 
number of "
@@ -250,7 +220,7 @@ DiskIoMgr::DiskIoMgr() :
 }
 
 DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
-    int threads_per_solid_state_disk, int min_buffer_size, int 
max_buffer_size) :
+    int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t 
max_buffer_size) :
     num_io_threads_per_rotational_disk_(threads_per_rotational_disk),
     num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk),
     max_buffer_size_(max_buffer_size),
@@ -262,8 +232,6 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int 
threads_per_rotational_disk,
         FileSystemUtil::MaxNumFileHandles()),
         FLAGS_num_file_handle_cache_partitions,
         FLAGS_unused_file_handle_timeout_sec) {
-  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, 
min_buffer_size_);
-  free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
   disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
   CheckSseSupport();
@@ -287,37 +255,22 @@ DiskIoMgr::~DiskIoMgr() {
   for (int i = 0; i < disk_queues_.size(); ++i) {
     if (disk_queues_[i] == nullptr) continue;
     int disk_id = disk_queues_[i]->disk_id;
-    for (list<RequestContext*>::iterator it = 
disk_queues_[i]->request_contexts.begin();
-        it != disk_queues_[i]->request_contexts.end(); ++it) {
-      DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
-      DCHECK((*it)->disk_states_[disk_id].done());
-      (*it)->DecrementDiskRefCount();
+    for (RequestContext* context : disk_queues_[i]->request_contexts) {
+      unique_lock<mutex> context_lock(context->lock_);
+      DCHECK_EQ(context->disk_states_[disk_id].num_threads_in_op(), 0);
+      DCHECK(context->disk_states_[disk_id].done());
+      context->DecrementDiskRefCount(context_lock);
     }
   }
 
-  DCHECK_EQ(num_buffers_in_readers_.Load(), 0);
-
-  // Delete all allocated buffers
-  int num_free_buffers = 0;
-  for (int idx = 0; idx < free_buffers_.size(); ++idx) {
-    num_free_buffers += free_buffers_[idx].size();
-  }
-  DCHECK_EQ(num_allocated_buffers_.Load(), num_free_buffers);
-  GcIoBuffers();
-
   for (int i = 0; i < disk_queues_.size(); ++i) {
     delete disk_queues_[i];
   }
 
-  if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->Close();
   if (cached_read_options_ != nullptr) 
hadoopRzOptionsFree(cached_read_options_);
 }
 
-Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
-  DCHECK(process_mem_tracker != nullptr);
-  free_buffer_mem_tracker_.reset(
-      new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
-
+Status DiskIoMgr::Init() {
   for (int i = 0; i < disk_queues_.size(); ++i) {
     disk_queues_[i] = new DiskQueue(i);
     int num_threads_per_disk;
@@ -372,88 +325,6 @@ void DiskIoMgr::UnregisterContext(RequestContext* reader) {
   reader->CancelAndMarkInactive();
 }
 
-// Cancellation requires coordination from multiple threads.  Each thread that 
currently
-// has a reference to the request context must notice the cancel and remove it 
from its
-// tracking structures.  The last thread to touch the context should 
deallocate (aka
-// recycle) the request context object.  Potential threads are:
-//  1. Disk threads that are currently reading for this reader.
-//  2. Caller threads that are waiting in GetNext.
-//
-// The steps are:
-// 1. Cancel will immediately set the context in the Cancelled state.  This 
prevents any
-// other thread from adding more ready buffers to the context (they all take a 
lock and
-// check the state before doing so), or any write ranges to the context.
-// 2. Cancel will call cancel on each ScanRange that is not yet complete, 
unblocking
-// any threads in GetNext(). The reader will see the cancelled Status 
returned. Cancel
-// also invokes the callback for the WriteRanges with the cancelled state.
-// 3. Disk threads notice the context is cancelled either when picking the 
next context
-// to process or when they try to enqueue a ready buffer.  Upon noticing the 
cancelled
-// state, removes the context from the disk queue.  The last thread per disk 
with an
-// outstanding reference to the context decrements the number of disk queues 
the context
-// is on.
-void DiskIoMgr::CancelContext(RequestContext* context) {
-  context->Cancel(Status::CANCELLED);
-}
-
-void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) {
-  r->read_timer_ = c;
-}
-
-void DiskIoMgr::set_bytes_read_counter(RequestContext* r, 
RuntimeProfile::Counter* c) {
-  r->bytes_read_counter_ = c;
-}
-
-void DiskIoMgr::set_active_read_thread_counter(RequestContext* r,
-    RuntimeProfile::Counter* c) {
-  r->active_read_thread_counter_ = c;
-}
-
-void DiskIoMgr::set_disks_access_bitmap(RequestContext* r,
-    RuntimeProfile::Counter* c) {
-  r->disks_accessed_bitmap_ = c;
-}
-
-int64_t DiskIoMgr::queue_size(RequestContext* reader) const {
-  return reader->num_ready_buffers_.Load();
-}
-
-Status DiskIoMgr::context_status(RequestContext* context) const {
-  unique_lock<mutex> lock(context->lock_);
-  return context->status_;
-}
-
-int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const {
-  return reader->bytes_read_local_.Load();
-}
-
-int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const {
-  return reader->bytes_read_short_circuit_.Load();
-}
-
-int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const {
-  return reader->bytes_read_dn_cache_.Load();
-}
-
-int DiskIoMgr::num_remote_ranges(RequestContext* reader) const {
-  return reader->num_remote_ranges_.Load();
-}
-
-int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const {
-  return reader->unexpected_remote_bytes_.Load();
-}
-
-int DiskIoMgr::cached_file_handles_hit_count(RequestContext* reader) const {
-  return reader->cached_file_handles_hit_count_.Load();
-}
-
-int DiskIoMgr::cached_file_handles_miss_count(RequestContext* reader) const {
-  return reader->cached_file_handles_miss_count_.Load();
-}
-
-int64_t DiskIoMgr::GetReadThroughput() {
-  return RuntimeProfile::UnitsPerSecond(&total_bytes_read_counter_, 
&read_timer_);
-}
-
 Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
   int disk_id = range->disk_id_;
   if (disk_id < 0 || disk_id >= disk_queues_.size()) {
@@ -485,10 +356,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader,
   unique_lock<mutex> reader_lock(reader->lock_);
   DCHECK(reader->Validate()) << endl << reader->DebugString();
 
-  if (reader->state_ == RequestContext::Cancelled) {
-    DCHECK(!reader->status_.ok());
-    return reader->status_;
-  }
+  if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
 
   // Add each range to the queue of the disk the range is on
   for (int i = 0; i < ranges.size(); ++i) {
@@ -507,7 +375,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader,
         continue;
       }
     }
-    reader->AddRequestRange(range, schedule_immediately);
+    reader->AddRequestRange(reader_lock, range, schedule_immediately);
   }
   DCHECK(reader->Validate()) << endl << reader->DebugString();
 
@@ -533,8 +401,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, 
ScanRange** range) {
 
   while (true) {
     if (reader->state_ == RequestContext::Cancelled) {
-      DCHECK(!reader->status_.ok());
-      status = reader->status_;
+      status = Status::CANCELLED;
       break;
     }
 
@@ -553,7 +420,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, 
ScanRange** range) {
       if (cached_read_succeeded) return Status::OK();
 
       // This range ended up not being cached. Loop again and pick up a new 
range.
-      reader->AddRequestRange(*range, false);
+      reader->AddRequestRange(reader_lock, *range, false);
       DCHECK(reader->Validate()) << endl << reader->DebugString();
       *range = nullptr;
       continue;
@@ -569,185 +436,13 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, 
ScanRange** range) {
       // Set this to nullptr, the next time this disk runs for this reader, it 
will
       // get another range ready.
       reader->disk_states_[disk_id].set_next_scan_range_to_start(nullptr);
-      reader->ScheduleScanRange(*range);
+      reader->ScheduleScanRange(reader_lock, *range);
       break;
     }
   }
   return status;
 }
 
-Status DiskIoMgr::Read(RequestContext* reader,
-    ScanRange* range, std::unique_ptr<BufferDescriptor>* buffer) {
-  DCHECK(range != nullptr);
-  DCHECK(buffer != nullptr);
-  *buffer = nullptr;
-
-  if (range->len() > max_buffer_size_
-      && range->external_buffer_tag_ != 
ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
-    return Status(TErrorCode::DISK_IO_ERROR, Substitute("Internal error: 
cannot "
-        "perform sync read of '$0' bytes that is larger than the max read 
buffer size "
-        "'$1'.", range->len(), max_buffer_size_));
-  }
-
-  vector<ScanRange*> ranges;
-  ranges.push_back(range);
-  RETURN_IF_ERROR(AddScanRanges(reader, ranges, true));
-  RETURN_IF_ERROR(range->GetNext(buffer));
-  DCHECK((*buffer) != nullptr);
-  DCHECK((*buffer)->eosr());
-  return Status::OK();
-}
-
-void DiskIoMgr::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
-  DCHECK(buffer_desc != nullptr);
-  if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == nullptr);
-
-  RequestContext* reader = buffer_desc->reader_;
-  if (buffer_desc->buffer_ != nullptr) {
-    if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) {
-      // Buffers the were not allocated by DiskIoMgr don't need to be freed.
-      FreeBufferMemory(buffer_desc.get());
-    }
-    buffer_desc->buffer_ = nullptr;
-    num_buffers_in_readers_.Add(-1);
-    reader->num_buffers_in_reader_.Add(-1);
-  } else {
-    // A nullptr buffer means there was an error in which case there is no 
buffer
-    // to return.
-  }
-
-  if (buffer_desc->eosr_ || buffer_desc->scan_range_->is_cancelled_) {
-    // Need to close the scan range if returning the last buffer or the scan 
range
-    // has been cancelled (and the caller might never get the last buffer).
-    // Close() is idempotent so multiple cancelled buffers is okay.
-    buffer_desc->scan_range_->Close();
-  }
-}
-
-unique_ptr<BufferDescriptor> DiskIoMgr::GetFreeBuffer(
-    RequestContext* reader, ScanRange* range, int64_t buffer_size) {
-  DCHECK_LE(buffer_size, max_buffer_size_);
-  DCHECK_GT(buffer_size, 0);
-  buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size);
-  int idx = free_buffers_idx(buffer_size);
-  // Quantize buffer size to nearest power of 2 greater than the specified 
buffer size and
-  // convert to bytes
-  buffer_size = (1LL << idx) * min_buffer_size_;
-
-  // Track memory against the reader. This is checked the next time we start
-  // a read for the next reader in DiskIoMgr::GetNextScanRange().
-  DCHECK(reader->mem_tracker_ != nullptr);
-  reader->mem_tracker_->Consume(buffer_size);
-
-  uint8_t* buffer = nullptr;
-  {
-    unique_lock<mutex> lock(free_buffers_lock_);
-    if (free_buffers_[idx].empty()) {
-      num_allocated_buffers_.Add(1);
-      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
-        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
-      }
-      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
-        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size);
-      }
-      // We already tracked this memory against the reader's MemTracker.
-      buffer = new uint8_t[buffer_size];
-    } else {
-      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
-        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
-      }
-      buffer = free_buffers_[idx].front();
-      free_buffers_[idx].pop_front();
-      free_buffer_mem_tracker_->Release(buffer_size);
-      ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size);
-    }
-  }
-
-  // Validate more invariants.
-  DCHECK(range != nullptr);
-  DCHECK(reader != nullptr);
-  DCHECK(buffer != nullptr);
-  return unique_ptr<BufferDescriptor>(new BufferDescriptor(
-      this, reader, range, buffer, buffer_size, reader->mem_tracker_));
-}
-
-void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
-  unique_lock<mutex> lock(free_buffers_lock_);
-  int buffers_freed = 0;
-  int bytes_freed = 0;
-  // Free small-to-large to avoid retaining many small buffers and fragmenting 
memory.
-  for (int idx = 0; idx < free_buffers_.size(); ++idx) {
-    deque<uint8_t*>* free_buffers = &free_buffers_[idx];
-    while (
-        !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= 
bytes_to_free)) {
-      uint8_t* buffer = free_buffers->front();
-      free_buffers->pop_front();
-      int64_t buffer_size = (1LL << idx) * min_buffer_size_;
-      ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size);
-      delete[] buffer;
-      free_buffer_mem_tracker_->Release(buffer_size);
-      num_allocated_buffers_.Add(-1);
-
-      ++buffers_freed;
-      bytes_freed += buffer_size;
-    }
-    if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break;
-  }
-
-  if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-buffers_freed);
-  }
-  if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
-    ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed);
-  }
-  if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed);
-  }
-}
-
-void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
-  DCHECK(!desc->is_cached());
-  DCHECK(!desc->is_client_buffer());
-  uint8_t* buffer = desc->buffer_;
-  int64_t buffer_size = desc->buffer_len_;
-  int idx = free_buffers_idx(buffer_size);
-  DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0)
-      << "buffer_size_ / min_buffer_size_ should be power of 2, got 
buffer_size = "
-      << buffer_size << ", min_buffer_size_ = " << min_buffer_size_;
-
-  {
-    unique_lock<mutex> lock(free_buffers_lock_);
-    if (!FLAGS_disable_mem_pools &&
-        free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
-      // Poison buffers stored in cache.
-      ASAN_POISON_MEMORY_REGION(buffer, buffer_size);
-      free_buffers_[idx].push_back(buffer);
-      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
-        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);
-      }
-      // This consume call needs to be protected by 'free_buffers_lock_' to 
avoid a race
-      // with a Release() call for the same buffer that could make consumption 
negative.
-      // Note: we can't use TryConsume(), which can indirectly call 
GcIoBuffers().
-      // TODO: after IMPALA-3200 is completed, we should be able to leverage 
the buffer
-      // pool's free lists, and remove these free lists.
-      free_buffer_mem_tracker_->Consume(buffer_size);
-    } else {
-      num_allocated_buffers_.Add(-1);
-      delete[] buffer;
-      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
-        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
-      }
-      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
-        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
-      }
-    }
-  }
-
-  // We transferred the buffer ownership from the BufferDescriptor to the 
DiskIoMgr.
-  desc->mem_tracker_->Release(buffer_size);
-  desc->buffer_ = nullptr;
-}
-
 // This function gets the next RequestRange to work on for this disk. It 
checks for
 // cancellation and
 // a) Updates ready_to_start_ranges if there are no scan ranges queued for 
this disk.
@@ -785,7 +480,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, 
RequestRange** range,
       disk_queue->request_contexts.pop_front();
       DCHECK(*request_context != nullptr);
       request_disk_state = &((*request_context)->disk_states_[disk_id]);
-      request_disk_state->IncrementRequestThreadAndDequeue();
+      request_disk_state->IncrementDiskThreadAndDequeue();
     }
 
     // NOTE: no locks were taken in between.  We need to be careful about what 
state
@@ -793,27 +488,13 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* 
disk_queue, RequestRange** range,
     // There are some invariants here.  Only one disk thread can have the
     // same reader here (the reader is removed from the queue).  There can be
     // other disk threads operating on this reader in other functions though.
-
-    // We just picked a reader. Before we may allocate a buffer on its behalf, 
check that
-    // it has not exceeded any memory limits (e.g. the query or process limit).
-    // TODO: once IMPALA-3200 is fixed, we should be able to remove the free 
lists and
-    // move these memory limit checks to GetFreeBuffer().
-    // Note that calling AnyLimitExceeded() can result in a call to 
GcIoBuffers().
-    // TODO: IMPALA-3209: we should not force a reader over its memory limit by
-    // pushing more buffers to it. Most readers can make progress and operate 
within
-    // a fixed memory limit.
-    if ((*request_context)->mem_tracker_ != nullptr
-        && (*request_context)->mem_tracker_->AnyLimitExceeded()) {
-      (*request_context)->Cancel(Status::MemLimitExceeded());
-    }
-
     unique_lock<mutex> request_lock((*request_context)->lock_);
     VLOG_FILE << "Disk (id=" << disk_id << ") reading for "
         << (*request_context)->DebugString();
 
     // Check if reader has been cancelled
     if ((*request_context)->state_ == RequestContext::Cancelled) {
-      request_disk_state->DecrementRequestThreadAndCheckDone(*request_context);
+      request_disk_state->DecrementDiskThread(request_lock, *request_context);
       continue;
     }
 
@@ -856,7 +537,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, 
RequestRange** range,
 
     // There are no inflight ranges, nothing to do.
     if (request_disk_state->in_flight_ranges()->empty()) {
-      request_disk_state->DecrementRequestThread();
+      request_disk_state->DecrementDiskThread(request_lock, *request_context);
       continue;
     }
     DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
@@ -865,7 +546,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, 
RequestRange** range,
 
     // Now that we've picked a request range, put the context back on the 
queue so
     // another thread can pick up another request range for this context.
-    request_disk_state->ScheduleContext(*request_context, disk_id);
+    request_disk_state->ScheduleContext(request_lock, *request_context, 
disk_id);
     DCHECK((*request_context)->Validate()) << endl << 
(*request_context)->DebugString();
     return true;
   }
@@ -879,81 +560,67 @@ void DiskIoMgr::HandleWriteFinished(
   // Copy disk_id before running callback: the callback may modify write_range.
   int disk_id = write_range->disk_id_;
 
-  // Execute the callback before decrementing the thread count. Otherwise 
CancelContext()
-  // that waits for the disk ref count to be 0 will return, creating a race, 
e.g. see
-  // IMPALA-1890.
+  // Execute the callback before decrementing the thread count. Otherwise
+  // RequestContext::Cancel() that waits for the disk ref count to be 0 will
+  // return, creating a race, e.g. see IMPALA-1890.
   // The status of the write does not affect the status of the writer context.
   write_range->callback_(write_status);
   {
     unique_lock<mutex> writer_lock(writer->lock_);
     DCHECK(writer->Validate()) << endl << writer->DebugString();
     RequestContext::PerDiskState& state = writer->disk_states_[disk_id];
-    if (writer->state_ == RequestContext::Cancelled) {
-      state.DecrementRequestThreadAndCheckDone(writer);
-    } else {
-      state.DecrementRequestThread();
-    }
+    state.DecrementDiskThread(writer_lock, writer);
     --state.num_remaining_ranges();
   }
 }
 
 void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* 
reader,
-    unique_ptr<BufferDescriptor> buffer) {
+    Status read_status, unique_ptr<BufferDescriptor> buffer) {
   unique_lock<mutex> reader_lock(reader->lock_);
 
-  RequestContext::PerDiskState& state = 
reader->disk_states_[disk_queue->disk_id];
+  RequestContext::PerDiskState* disk_state = 
&reader->disk_states_[disk_queue->disk_id];
   DCHECK(reader->Validate()) << endl << reader->DebugString();
-  DCHECK_GT(state.num_threads_in_op(), 0);
-  DCHECK(buffer->buffer_ != nullptr);
-
-  if (reader->state_ == RequestContext::Cancelled) {
-    state.DecrementRequestThreadAndCheckDone(reader);
-    DCHECK(reader->Validate()) << endl << reader->DebugString();
-    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
-    buffer->buffer_ = nullptr;
-    ScanRange* scan_range = buffer->scan_range_;
-    scan_range->Cancel(reader->status_);
-    // Enqueue the buffer to use the scan range's buffer cleanup path.
-    scan_range->EnqueueBuffer(reader_lock, move(buffer));
-    return;
-  }
-
-  DCHECK_EQ(reader->state_, RequestContext::Active);
+  DCHECK_GT(disk_state->num_threads_in_op(), 0);
   DCHECK(buffer->buffer_ != nullptr);
+  DCHECK(!buffer->is_cached()) << "HDFS cache reads don't go through this code 
path.";
 
-  // Update the reader's scan ranges.  There are a three cases here:
-  //  1. Read error
-  //  2. End of scan range
-  //  3. Middle of scan range
-  if (!buffer->status_.ok()) {
-    // Error case
-    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
-    buffer->buffer_ = nullptr;
-    buffer->eosr_ = true;
-    --state.num_remaining_ranges();
-    buffer->scan_range_->Cancel(buffer->status_);
-  } else if (buffer->eosr_) {
-    --state.num_remaining_ranges();
-  }
-
-  // After calling EnqueueBuffer(), it is no longer valid to read from buffer.
+  // After calling EnqueueBuffer() below, it is no longer valid to read from 
buffer.
   // Store the state we need before calling EnqueueBuffer().
   bool eosr = buffer->eosr_;
+
+  // TODO: IMPALA-4249: it safe to touch 'scan_range' until 
DecrementDiskThread() is
+  // called because all clients of DiskIoMgr keep ScanRange objects alive 
until they
+  // unregister their RequestContext.
   ScanRange* scan_range = buffer->scan_range_;
-  bool is_cached = buffer->is_cached();
-  bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer));
-  if (eosr) {
-    // For cached buffers, we can't close the range until the cached buffer is 
returned.
-    // Close() is called from DiskIoMgr::ReturnBuffer().
-    if (!is_cached) scan_range->Close();
-  } else {
-    if (queue_full) {
-      reader->blocked_ranges_.Enqueue(scan_range);
-    } else {
-      reader->ScheduleScanRange(scan_range);
+  bool scan_range_done = eosr;
+  if (read_status.ok() && reader->state_ != RequestContext::Cancelled) {
+    DCHECK_EQ(reader->state_, RequestContext::Active);
+    // Read successfully - update the reader's scan ranges.  There are two 
cases here:
+    //  1. End of scan range
+    //  2. Middle of scan range
+    bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer));
+    if (!eosr) {
+      if (queue_full) {
+        reader->blocked_ranges_.Enqueue(scan_range);
+      } else {
+        reader->ScheduleScanRange(reader_lock, scan_range);
+      }
     }
+  } else {
+    // The scan range will be cancelled, either because we hit an error or 
because the
+    // request context was cancelled.  The buffer is not needed - we must free 
it.
+    reader->FreeBuffer(buffer.get());
+    reader->num_used_buffers_.Add(-1);
+    // Propagate the error or cancellation by cancelling the scan range.
+    scan_range->Cancel(read_status.ok() ? Status::CANCELLED : read_status);
+    scan_range_done = true;
+  }
+  if (scan_range_done) {
+    scan_range->Close();
+    --disk_state->num_remaining_ranges();
   }
-  state.DecrementRequestThread();
+  DCHECK(reader->Validate()) << endl << reader->DebugString();
+  disk_state->DecrementDiskThread(reader_lock, reader);
 }
 
 void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
@@ -997,10 +664,13 @@ void DiskIoMgr::ReadRange(
   unique_ptr<BufferDescriptor> buffer_desc;
   if (range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
     buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, 
reader, range,
-        range->client_buffer_.data, range->client_buffer_.len, nullptr));
+        range->client_buffer_.data, range->client_buffer_.len));
   } else {
+    DCHECK(range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER)
+        << "This code path does not handle other buffer types, i.e. HDFS cache"
+        << static_cast<int>(range->external_buffer_tag_);
     // Need to allocate a buffer to read into.
-    int64_t buffer_size = ::min(bytes_remaining, 
static_cast<int64_t>(max_buffer_size_));
+    int64_t buffer_size = min(bytes_remaining, max_buffer_size_);
     buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, 
buffer_size);
     if (buffer_desc == nullptr) return;
   }
@@ -1008,8 +678,8 @@ void DiskIoMgr::ReadRange(
 
   // No locks in this section.  Only working on local vars.  We don't want to 
hold a
   // lock across the read call.
-  buffer_desc->status_ = range->Open(detail::is_file_handle_caching_enabled());
-  if (buffer_desc->status_.ok()) {
+  Status read_status = range->Open(detail::is_file_handle_caching_enabled());
+  if (read_status.ok()) {
     // Update counters.
     if (reader->active_read_thread_counter_) {
       reader->active_read_thread_counter_->Add(1L);
@@ -1021,7 +691,7 @@ void DiskIoMgr::ReadRange(
     SCOPED_TIMER(&read_timer_);
     SCOPED_TIMER(reader->read_timer_);
 
-    buffer_desc->status_ = range->Read(buffer_desc->buffer_, 
buffer_desc->buffer_len_,
+    read_status = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
         &buffer_desc->len_, &buffer_desc->eosr_);
     buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
 
@@ -1036,29 +706,25 @@ void DiskIoMgr::ReadRange(
   }
 
   // Finished read, update reader/disk based on the results
-  HandleReadFinished(disk_queue, reader, move(buffer_desc));
+  HandleReadFinished(disk_queue, reader, read_status, move(buffer_desc));
 }
 
 unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange(
     DiskQueue* disk_queue, RequestContext* reader, ScanRange* range,
     int64_t buffer_size) {
   DCHECK(reader->mem_tracker_ != nullptr);
+  // TODO: replace this with reservation check (if needed at all).
   bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
-  if (!enough_memory) {
-    // Low memory, GC all the buffers and try again.
-    GcIoBuffers();
-    enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
-  }
 
+  RequestContext::PerDiskState* disk_state = 
&reader->disk_states_[disk_queue->disk_id];
   if (!enough_memory) {
-    RequestContext::PerDiskState& state = 
reader->disk_states_[disk_queue->disk_id];
     unique_lock<mutex> reader_lock(reader->lock_);
 
     // Just grabbed the reader lock, check for cancellation.
     if (reader->state_ == RequestContext::Cancelled) {
       DCHECK(reader->Validate()) << endl << reader->DebugString();
-      state.DecrementRequestThreadAndCheckDone(reader);
-      range->Cancel(reader->status_);
+      disk_state->DecrementDiskThread(reader_lock, reader);
+      range->Cancel(Status::CANCELLED);
       DCHECK(reader->Validate()) << endl << reader->DebugString();
       return nullptr;
     }
@@ -1068,7 +734,7 @@ unique_ptr<BufferDescriptor> 
DiskIoMgr::TryAllocateNextBufferForRange(
       // (it already has one queued). Skip this range and pick it up later.
       range->blocked_on_queue_ = true;
       reader->blocked_ranges_.Enqueue(range);
-      state.DecrementRequestThread();
+      disk_state->DecrementDiskThread(reader_lock, reader);
       return nullptr;
     } else {
       // We need to get a buffer anyway since there are none queued. The query
@@ -1076,7 +742,18 @@ unique_ptr<BufferDescriptor> 
DiskIoMgr::TryAllocateNextBufferForRange(
       // now.
     }
   }
-  unique_ptr<BufferDescriptor> buffer_desc = GetFreeBuffer(reader, range, 
buffer_size);
+  unique_ptr<BufferDescriptor> buffer_desc;
+  Status status = reader->AllocBuffer(range, buffer_size, &buffer_desc);
+  if (!status.ok()) {
+    // Hit memory limit - cancel range.
+    range->Cancel(status);
+    {
+      unique_lock<mutex> reader_lock(reader->lock_);
+      disk_state->DecrementDiskThread(reader_lock, reader);
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+    }
+    return nullptr;
+  }
   DCHECK(buffer_desc != nullptr);
   return buffer_desc;
 }
@@ -1139,23 +816,10 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, 
WriteRange* write_range) {
   return Status::OK();
 }
 
-int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
-  int64_t buffer_size_scaled = BitUtil::Ceil(buffer_size, min_buffer_size_);
-  int idx = BitUtil::Log2Ceiling64(buffer_size_scaled);
-  DCHECK_GE(idx, 0);
-  DCHECK_LT(idx, free_buffers_.size());
-  return idx;
-}
-
 Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* 
write_range) {
   unique_lock<mutex> writer_lock(writer->lock_);
-
-  if (writer->state_ == RequestContext::Cancelled) {
-    DCHECK(!writer->status_.ok());
-    return writer->status_;
-  }
-
-  writer->AddRequestRange(write_range, false);
+  if (writer->state_ == RequestContext::Cancelled) return Status::CANCELLED;
+  writer->AddRequestRange(writer_lock, write_range, false);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 4fa078b..20f5d44 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -144,8 +144,8 @@ namespace io {
 ///
 /// As a caller reads from a scan range, these buffers are wrapped in 
BufferDescriptors
 /// and returned to the caller. The caller must always call ReturnBuffer() on 
the buffer
-/// descriptor to allow recycling of the associated buffer (if there is an
-/// IoMgr-allocated or HDFS cached buffer).
+/// descriptor to allow freeing of the associated buffer (if there is an 
IoMgr-allocated
+/// or HDFS cached buffer).
 ///
 /// Caching support:
 /// Scan ranges contain metadata on whether or not it is cached on the DN. In 
that
@@ -207,7 +207,7 @@ class DiskIoMgr : public CacheLineAligned {
   ///  - min_buffer_size: minimum io buffer size (in bytes)
   ///  - max_buffer_size: maximum io buffer size (in bytes). Also the max read 
size.
   DiskIoMgr(int num_disks, int threads_per_rotational_disk,
-      int threads_per_solid_state_disk, int min_buffer_size, int 
max_buffer_size);
+      int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t 
max_buffer_size);
 
   /// Create DiskIoMgr with default configs.
   DiskIoMgr();
@@ -217,7 +217,8 @@ class DiskIoMgr : public CacheLineAligned {
   ~DiskIoMgr();
 
   /// Initialize the IoMgr. Must be called once before any of the other APIs.
-  Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT;
+  Status Init() WARN_UNUSED_RESULT;
+
 
   /// Allocates tracking structure for a request context.
   /// Register a new request context and return it to the caller. The caller 
must call
@@ -236,12 +237,6 @@ class DiskIoMgr : public CacheLineAligned {
   /// up.
   void UnregisterContext(RequestContext* context);
 
-  /// This function cancels the context asychronously. All outstanding requests
-  /// are aborted and tracking structures cleaned up. This does not need to be
-  /// called if the context finishes normally.
-  /// This will also fail any outstanding GetNext()/Read requests.
-  void CancelContext(RequestContext* context);
-
   /// Adds the scan ranges to the queues. This call is non-blocking. The 
caller must
   /// not deallocate the scan range pointers before UnregisterContext().
   /// If schedule_immediately, the ranges are immediately put on the read queue
@@ -266,52 +261,14 @@ class DiskIoMgr : public CacheLineAligned {
   /// This call is blocking.
   Status GetNextRange(RequestContext* reader, ScanRange** range) 
WARN_UNUSED_RESULT;
 
-  /// Reads the range and returns the result in buffer.
-  /// This behaves like the typical synchronous read() api, blocking until the 
data
-  /// is read. This can be called while there are outstanding ScanRanges and is
-  /// thread safe. Multiple threads can be calling Read() per reader at a time.
-  /// range *cannot* have already been added via AddScanRanges.
-  /// This can only be used if the scan range fits in a single IO buffer (i.e. 
is smaller
-  /// than max_read_buffer_size()) or if reading into a client-provided buffer.
-  Status Read(RequestContext* reader, ScanRange* range,
-      std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
-
-  /// Returns the buffer to the IoMgr. This must be called for every buffer
-  /// returned by GetNext()/Read() that did not return an error. This is 
non-blocking.
-  /// After calling this, the buffer descriptor is invalid and cannot be 
accessed.
-  void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
-
   /// Determine which disk queue this file should be assigned to.  Returns an 
index into
   /// disk_queues_.  The disk_id is the volume ID for the local disk that 
holds the
   /// files, or -1 if unknown.  Flag expected_local is true iff this impalad is
   /// co-located with the datanode for this file.
   int AssignQueue(const char* file, int disk_id, bool expected_local);
 
-  /// TODO: The functions below can be moved to RequestContext.
-  /// Returns the current status of the context.
-  Status context_status(RequestContext* context) const WARN_UNUSED_RESULT;
-
-  void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*);
-  void set_read_timer(RequestContext*, RuntimeProfile::Counter*);
-  void set_active_read_thread_counter(RequestContext*, 
RuntimeProfile::Counter*);
-  void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*);
-
-  int64_t queue_size(RequestContext* reader) const;
-  int64_t bytes_read_local(RequestContext* reader) const;
-  int64_t bytes_read_short_circuit(RequestContext* reader) const;
-  int64_t bytes_read_dn_cache(RequestContext* reader) const;
-  int num_remote_ranges(RequestContext* reader) const;
-  int64_t unexpected_remote_bytes(RequestContext* reader) const;
-  int cached_file_handles_hit_count(RequestContext* reader) const;
-  int cached_file_handles_miss_count(RequestContext* reader) const;
-
-  /// Returns the read throughput across all readers.
-  /// TODO: should this be a sliding window?  This should report metrics for 
the
-  /// last minute, hour and since the beginning.
-  int64_t GetReadThroughput();
-
   /// Returns the maximum read buffer size
-  int max_read_buffer_size() const { return max_buffer_size_; }
+  int64_t max_read_buffer_size() const { return max_buffer_size_; }
 
   /// Returns the total number of disk queues (both local and remote).
   int num_total_disks() const { return disk_queues_.size(); }
@@ -361,10 +318,6 @@ class DiskIoMgr : public CacheLineAligned {
   Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, 
int64_t mtime,
       CachedHdfsFileHandle** fid);
 
-  /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the 
buffers if
-  /// 'bytes_to_free' is -1.
-  void GcIoBuffers(int64_t bytes_to_free = -1);
-
   /// The maximum number of ready buffers that can be queued in a scan range. 
Having two
   /// queued buffers (plus the buffer that is returned to the client) gives 
good
   /// performance in most scenarios:
@@ -403,14 +356,6 @@ class DiskIoMgr : public CacheLineAligned {
   friend class DiskIoMgrTest_Buffers_Test;
   friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
 
-  /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
-  boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
-
-  /// Memory tracker for I/O buffers where the RequestContext has no 
MemTracker.
-  /// TODO: once IMPALA-3200 is fixed, there should be no more cases where 
readers don't
-  /// provide a MemTracker.
-  boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
-
   /// Number of worker(read) threads per rotational disk. Also the max depth 
of queued
   /// work to the disk.
   const int num_io_threads_per_rotational_disk_;
@@ -420,10 +365,10 @@ class DiskIoMgr : public CacheLineAligned {
   const int num_io_threads_per_solid_state_disk_;
 
   /// Maximum read size. This is also the maximum size of each allocated 
buffer.
-  const int max_buffer_size_;
+  const int64_t max_buffer_size_;
 
   /// The minimum size of each read buffer.
-  const int min_buffer_size_;
+  const int64_t min_buffer_size_;
 
   /// Thread group containing all the worker threads.
   ThreadGroup disk_thread_group_;
@@ -441,28 +386,6 @@ class DiskIoMgr : public CacheLineAligned {
   /// Total time spent in hdfs reading
   RuntimeProfile::Counter read_timer_;
 
-  /// Protects free_buffers_
-  boost::mutex free_buffers_lock_;
-
-  /// Free buffers that can be handed out to clients. There is one list for 
each buffer
-  /// size, indexed by the Log2 of the buffer size in units of 
min_buffer_size_. The
-  /// maximum buffer size is max_buffer_size_, so the maximum index is
-  /// Log2(max_buffer_size_ / min_buffer_size_).
-  //
-  /// E.g. if min_buffer_size_ = 1024 bytes:
-  ///  free_buffers_[0]  => list of free buffers with size 1024 B
-  ///  free_buffers_[1]  => list of free buffers with size 2048 B
-  ///  free_buffers_[10] => list of free buffers with size 1 MB
-  ///  free_buffers_[13] => list of free buffers with size 8 MB
-  ///  free_buffers_[n]  => list of free buffers with size 2^n * 1024 B
-  std::vector<std::deque<uint8_t*>> free_buffers_;
-
-  /// Total number of allocated buffers, used for debugging.
-  AtomicInt32 num_allocated_buffers_;
-
-  /// Total number of buffers in readers
-  AtomicInt32 num_buffers_in_readers_;
-
   /// Per disk queues. This is static and created once at Init() time.  One 
queue is
   /// allocated for each local disk on the system and for each remote 
filesystem type.
   /// It is indexed by disk id.
@@ -478,23 +401,6 @@ class DiskIoMgr : public CacheLineAligned {
   // handles are closed.
   FileHandleCache file_handle_cache_;
 
-  /// Returns the index into free_buffers_ for a given buffer size
-  int free_buffers_idx(int64_t buffer_size);
-
-  /// Returns a buffer to read into with size between 'buffer_size' and
-  /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the
-  /// 'free_buffers_', that is returned, otherwise a new one is allocated.
-  /// The returned *buffer_size must be between 0 and 'max_buffer_size_'.
-  /// The buffer memory is tracked against reader's mem tracker, or
-  /// 'unowned_buffer_mem_tracker_' if the reader does not have one.
-  std::unique_ptr<BufferDescriptor> GetFreeBuffer(
-      RequestContext* reader, ScanRange* range, int64_t buffer_size);
-
-  /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be 
nullptr), either
-  /// freeing it or returning it to 'free_buffers_'. Memory tracking is 
updated to
-  /// reflect the transfer of ownership from desc->mem_tracker_ to the disk 
I/O mgr.
-  void FreeBufferMemory(BufferDescriptor* desc);
-
   /// Disk worker thread loop. This function retrieves the next range to 
process on
   /// the disk queue and invokes ReadRange() or Write() depending on the type 
of Range().
   /// There can be multiple threads per disk running this loop.
@@ -508,10 +414,12 @@ class DiskIoMgr : public CacheLineAligned {
   bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
       RequestContext** request_context);
 
-  /// Updates disk queue and reader state after a read is complete. The read 
result
-  /// is captured in the buffer descriptor.
+  /// Updates disk queue and reader state after a read is complete. If the read
+  /// was successful, 'read_status' is ok and 'buffer' contains the result of 
the
+  /// read. If the read failed with an error, 'read_status' contains the error 
and
+  /// 'buffer' has the buffer that was meant to hold the result of the read.
   void HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
-      std::unique_ptr<BufferDescriptor> buffer);
+      Status read_status, std::unique_ptr<BufferDescriptor> buffer);
 
   /// Invokes write_range->callback_  after the range has been written and
   /// updates per-disk state and handle state. The status of the write 
OK/RUNTIME_ERROR
@@ -534,13 +442,15 @@ class DiskIoMgr : public CacheLineAligned {
   /// Does not open or close the file that is written.
   Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) 
WARN_UNUSED_RESULT;
 
-  /// Reads the specified scan range and calls HandleReadFinished when done.
+  /// Reads the specified scan range and calls HandleReadFinished() when done.
   void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* 
range);
 
   /// Try to allocate the next buffer for the scan range, returning the new 
buffer
   /// if successful. If 'reader' is cancelled, cancels the range and returns 
nullptr.
   /// If there is memory pressure and buffers are already queued, adds the 
range
-  /// to the blocked ranges and returns nullptr.
+  /// to the blocked ranges and returns nullptr. If buffers are not queued and 
no more
+  /// buffers can be allocated, cancels the range with a MEM_LIMIT_EXCEEDED 
error and
+  /// returns nullptr.
   std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* 
disk_queue,
       RequestContext* reader, ScanRange* range, int64_t buffer_size);
 };

http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc 
b/be/src/runtime/io/request-context.cc
index 287f53a..031b976 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -22,32 +22,100 @@
 using namespace impala;
 using namespace impala::io;
 
-void RequestContext::Cancel(const Status& status) {
-  DCHECK(!status.ok());
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
+    RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
+    int64_t buffer_len)
+  : io_mgr_(io_mgr),
+    reader_(reader),
+    scan_range_(scan_range),
+    buffer_(buffer),
+    buffer_len_(buffer_len) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
+  DCHECK(buffer != nullptr);
+  DCHECK_GE(buffer_len, 0);
+}
+
+Status RequestContext::AllocBuffer(ScanRange* range, int64_t buffer_size,
+    unique_ptr<BufferDescriptor>* buffer_desc) {
+  DCHECK(range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER)
+      << static_cast<int>(range->external_buffer_tag_);
+  DCHECK_LE(buffer_size, parent_->max_buffer_size_);
+  DCHECK_GT(buffer_size, 0);
+  buffer_size = BitUtil::RoundUpToPowerOfTwo(
+      max(parent_->min_buffer_size_, min(parent_->max_buffer_size_, 
buffer_size)));
+
+  DCHECK(mem_tracker_ != nullptr);
+  if (!mem_tracker_->TryConsume(buffer_size)) {
+    return mem_tracker_->MemLimitExceeded(nullptr, "disk I/O buffer", 
buffer_size);
+  }
+
+  uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(buffer_size));
+  if (buffer == nullptr) {
+    mem_tracker_->Release(buffer_size);
+    return Status(TErrorCode::INTERNAL_ERROR,
+        Substitute("Could not malloc buffer of $0 bytes"));
+  }
+  buffer_desc->reset(new BufferDescriptor(parent_, this, range, buffer, 
buffer_size));
+  return Status::OK();
+}
+
+void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
+  DCHECK(buffer->buffer_ != nullptr);
+  if (!buffer->is_cached() && !buffer->is_client_buffer()) {
+    // Only buffers that were not allocated by DiskIoMgr need to have memory 
freed.
+    free(buffer->buffer_);
+    mem_tracker_->Release(buffer->buffer_len_);
+  }
+  buffer->buffer_ = nullptr;
+}
 
+// Cancellation of a RequestContext requires coordination from multiple 
threads that may
+// hold references to the context:
+//  1. Disk threads that are currently processing a range for this context.
+//  2. Caller threads that are waiting in GetNext().
+//
+// Each thread that currently has a reference to the request context must 
notice the
+// cancel, cancel any pending operations involving the context and remove the 
contxt from
+// tracking structures. Once no more operations are pending on the context and 
no more
+// I/O mgr threads hold references to the context, the context can be marked 
inactive
+// (see CancelAndMarkInactive()), after which the owner of the context object 
can free
+// it.
+//
+// The steps are:
+// 1. Cancel() will immediately set the context in the Cancelled state. This 
prevents any
+// other thread from adding more ready buffers to the context (they all take a 
lock and
+// check the state before doing so), or any write ranges to the context.
+// 2. Cancel() will call Cancel() on each ScanRange that is not yet complete, 
unblocking
+// any threads in GetNext(). If there was no prior error for a scan range, any 
reads from
+// that scan range will return a CANCELLED Status. Cancel() also invokes 
callbacks for
+// all WriteRanges with a CANCELLED Status.
+// 3. Disk threads notice the context is cancelled either when picking the 
next context
+// to process or when they try to enqueue a ready buffer. Upon noticing the 
cancelled
+// state, removes the context from the disk queue. The last thread per disk 
then calls
+// DecrementDiskRefCount(). After the last disk thread has called 
DecrementDiskRefCount(),
+// cancellation is done and it is safe to unregister the context.
+void RequestContext::Cancel() {
   // Callbacks are collected in this vector and invoked while no lock is held.
   vector<WriteRange::WriteDoneCallback> write_callbacks;
   {
-    lock_guard<mutex> lock(lock_);
+    unique_lock<mutex> lock(lock_);
     DCHECK(Validate()) << endl << DebugString();
 
     // Already being cancelled
     if (state_ == RequestContext::Cancelled) return;
 
-    DCHECK(status_.ok());
-    status_ = status;
-
     // The reader will be put into a cancelled state until call cleanup is 
complete.
     state_ = RequestContext::Cancelled;
 
     // Cancel all scan ranges for this reader. Each range could be one one of
     // four queues.
     for (int i = 0; i < disk_states_.size(); ++i) {
-      RequestContext::PerDiskState& state = disk_states_[i];
-      RequestRange* range = NULL;
-      while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
+      PerDiskState& state = disk_states_[i];
+      RequestRange* range = nullptr;
+      while ((range = state.in_flight_ranges()->Dequeue()) != nullptr) {
         if (range->request_type() == RequestType::READ) {
-          static_cast<ScanRange*>(range)->Cancel(status);
+          static_cast<ScanRange*>(range)->Cancel(Status::CANCELLED);
         } else {
           DCHECK(range->request_type() == RequestType::WRITE);
           
write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
@@ -55,36 +123,36 @@ void RequestContext::Cancel(const Status& status) {
       }
 
       ScanRange* scan_range;
-      while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) {
-        scan_range->Cancel(status);
+      while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != 
nullptr) {
+        scan_range->Cancel(Status::CANCELLED);
       }
       WriteRange* write_range;
-      while ((write_range = state.unstarted_write_ranges()->Dequeue()) != 
NULL) {
+      while ((write_range = state.unstarted_write_ranges()->Dequeue()) != 
nullptr) {
         write_callbacks.push_back(write_range->callback_);
       }
     }
 
-    ScanRange* range = NULL;
-    while ((range = ready_to_start_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
+    ScanRange* range = nullptr;
+    while ((range = ready_to_start_ranges_.Dequeue()) != nullptr) {
+      range->Cancel(Status::CANCELLED);
     }
-    while ((range = blocked_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
+    while ((range = blocked_ranges_.Dequeue()) != nullptr) {
+      range->Cancel(Status::CANCELLED);
     }
-    while ((range = cached_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
+    while ((range = cached_ranges_.Dequeue()) != nullptr) {
+      range->Cancel(Status::CANCELLED);
     }
 
-    // Schedule reader on all disks. The disks will notice it is cancelled and 
do any
-    // required cleanup
+    // Ensure that the reader is scheduled on all disks (it may already be 
scheduled on
+    // some). The disk threads will notice that the context is cancelled and 
do any
+    // required cleanup for the disk state.
     for (int i = 0; i < disk_states_.size(); ++i) {
-      RequestContext::PerDiskState& state = disk_states_[i];
-      state.ScheduleContext(this, i);
+      disk_states_[i].ScheduleContext(lock, this, i);
     }
   }
 
   for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) {
-    write_callback(status_);
+    write_callback(Status::CANCELLED);
   }
 
   // Signal reader and unblock the GetNext/Read thread.  That read will fail 
with
@@ -93,7 +161,7 @@ void RequestContext::Cancel(const Status& status) {
 }
 
 void RequestContext::CancelAndMarkInactive() {
-  Cancel(Status::CANCELLED);
+  Cancel();
 
   boost::unique_lock<boost::mutex> l(lock_);
   DCHECK_NE(state_, Inactive);
@@ -105,14 +173,22 @@ void RequestContext::CancelAndMarkInactive() {
   // Validate that no buffers were leaked from this context.
   DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString();
   DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString();
+
+  // Validate that no threads are active and the context is not queued.
+  for (const PerDiskState& disk_state : disk_states_) {
+    DCHECK_EQ(0, disk_state.in_flight_ranges()->size()) << endl << 
DebugString();
+    DCHECK_EQ(0, disk_state.unstarted_scan_ranges()->size()) << endl << 
DebugString();
+    DCHECK_EQ(0, disk_state.num_threads_in_op()) << endl << DebugString();
+    DCHECK(!disk_state.is_on_queue()) << endl << DebugString();
+  }
   DCHECK(Validate()) << endl << DebugString();
   state_ = Inactive;
 }
 
-void RequestContext::AddRequestRange(
+void RequestContext::AddRequestRange(const unique_lock<mutex>& lock,
     RequestRange* range, bool schedule_immediately) {
-  // DCHECK(lock_.is_locked()); // TODO: boost should have this API
-  RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  PerDiskState& state = disk_states_[range->disk_id()];
   if (state.done()) {
     DCHECK_EQ(state.num_remaining_ranges(), 0);
     state.set_done(false);
@@ -123,7 +199,7 @@ void RequestContext::AddRequestRange(
   if (range->request_type() == RequestType::READ) {
     ScanRange* scan_range = static_cast<ScanRange*>(range);
     if (schedule_immediately) {
-      ScheduleScanRange(scan_range);
+      ScheduleScanRange(lock, scan_range);
     } else {
       state.unstarted_scan_ranges()->Enqueue(scan_range);
       num_unstarted_scan_ranges_.Add(1);
@@ -143,7 +219,7 @@ void RequestContext::AddRequestRange(
     schedule_context = true;
   }
 
-  if (schedule_context) state.ScheduleContext(this, range->disk_id());
+  if (schedule_context) state.ScheduleContext(lock, this, range->disk_id());
   ++state.num_remaining_ranges();
 }
 
@@ -159,8 +235,7 @@ string RequestContext::DebugString() const {
   if (state_ == RequestContext::Cancelled) ss << "Cancelled";
   if (state_ == RequestContext::Active) ss << "Active";
   if (state_ != RequestContext::Inactive) {
-    ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
-       << " #ready_buffers=" << num_ready_buffers_.Load()
+    ss << " #ready_buffers=" << num_ready_buffers_.Load()
        << " #used_buffers=" << num_used_buffers_.Load()
        << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
        << " #finished_scan_ranges=" << num_finished_ranges_.Load()
@@ -284,10 +359,11 @@ bool RequestContext::Validate() const {
   return true;
 }
 
-void RequestContext::PerDiskState::ScheduleContext(
+void RequestContext::PerDiskState::ScheduleContext(const unique_lock<mutex>& 
context_lock,
     RequestContext* context, int disk_id) {
-  if (!is_on_queue_ && !done_) {
-    is_on_queue_ = true;
+  DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
+  if (is_on_queue_.Load() == 0 && !done_) {
+    is_on_queue_.Store(1);
     context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
   }
 }

Reply via email to