IMPALA-4835: Part 2: Allocate scan range buffers upfront

This change is a step towards reserving memory for buffers from the
buffer pool and constraining per-scanner memory requirements. This
change restructures the DiskIoMgr code so that each ScanRange operates
with a fixed set of buffers that are allocated upfront and recycled as
the I/O mgr works through the ScanRange.

One major change is that ScanRanges get blocked when a buffer is not
available and get unblocked when a client returns a buffer via
ReturnBuffer(). I was able to remove the logic to maintain the
blocked_ranges_ list by instead adding a separate set with all ranges
that are active.

There is also some miscellaneous cleanup included - e.g. reducing the
amount of code devoted to maintaining counters and metrics.

One tricky part of the existing code was the it called
IssueInitialRanges() with empty lists of files and depended on
DiskIoMgr::AddScanRanges() to not check for cancellation in that case.
See  IMPALA-6564. I changed the logic to not try to issue ranges for
empty lists of files.

I plan to merge this along with the actual buffer pool switch, but
separated it out to allow review of the DiskIoMgr changes separate from
other aspects of the buffer pool switchover.

Testing:
* Ran core and exhaustive tests.

Change-Id: Ia243bf8b62feeea602b122e0503ea4ba7d3ee70f
Reviewed-on: http://gerrit.cloudera.org:8080/8707
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Tim Armstrong <tarmstr...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9416


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0b6fab73
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0b6fab73
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0b6fab73

Branch: refs/heads/2.x
Commit: 0b6fab7315c6894d4f96d52fb022305be63884a2
Parents: 3b3bf87
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Wed Nov 29 11:30:51 2017 -0800
Committer: Tim Armstrong <tarmstr...@cloudera.com>
Committed: Fri Feb 23 22:51:01 2018 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc         |   1 +
 be/src/exec/base-sequence-scanner.h          |   3 +-
 be/src/exec/hdfs-lzo-text-scanner.cc         |   1 +
 be/src/exec/hdfs-parquet-scanner.cc          |  25 +-
 be/src/exec/hdfs-parquet-scanner.h           |   6 +-
 be/src/exec/hdfs-scan-node-base.cc           |  34 ++-
 be/src/exec/hdfs-scan-node-mt.cc             |  13 +-
 be/src/exec/hdfs-scan-node.cc                |  32 ++-
 be/src/exec/scanner-context.cc               |  15 +-
 be/src/runtime/bufferpool/buffer-pool.h      |   1 +
 be/src/runtime/io/disk-io-mgr-internal.h     |  16 ++
 be/src/runtime/io/disk-io-mgr-stress-test.cc |  13 +-
 be/src/runtime/io/disk-io-mgr-stress.cc      |  12 +-
 be/src/runtime/io/disk-io-mgr-test.cc        | 232 +++++++++++++++--
 be/src/runtime/io/disk-io-mgr.cc             | 295 +++++++++++-----------
 be/src/runtime/io/disk-io-mgr.h              | 257 ++++++++++---------
 be/src/runtime/io/request-context.cc         | 152 +++++------
 be/src/runtime/io/request-context.h          | 153 ++++++-----
 be/src/runtime/io/request-ranges.h           |  94 +++++--
 be/src/runtime/io/scan-range.cc              | 205 ++++++++++-----
 be/src/runtime/tmp-file-mgr.cc               |  14 +-
 be/src/runtime/tmp-file-mgr.h                |  17 +-
 be/src/util/bit-util-test.cc                 |  11 +
 be/src/util/bit-util.h                       |   8 +-
 24 files changed, 1020 insertions(+), 590 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc 
b/be/src/exec/base-sequence-scanner.cc
index 9cb6330..9d95b0b 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -46,6 +46,7 @@ static const int MIN_SYNC_READ_SIZE = 64 * 1024; // bytes
 
 Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
+  DCHECK(!files.empty());
   // Issue just the header range for each file.  When the header is complete,
   // we'll issue the splits for that file.  Splits cannot be processed until 
the
   // header is parsed (the header object is then shared across splits for that 
file).

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/exec/base-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.h 
b/be/src/exec/base-sequence-scanner.h
index 887ff6f..3c2326e 100644
--- a/be/src/exec/base-sequence-scanner.h
+++ b/be/src/exec/base-sequence-scanner.h
@@ -47,7 +47,8 @@ class ScannerContext;
 /// situation, causing the block to be incorrectly skipped.
 class BaseSequenceScanner : public HdfsScanner {
  public:
-  /// Issue the initial ranges for all sequence container files.
+  /// Issue the initial ranges for all sequence container files. 'files' must 
not be
+  /// empty.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
                                    const std::vector<HdfsFileDesc*>& files)
                                    WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/exec/hdfs-lzo-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-lzo-text-scanner.cc 
b/be/src/exec/hdfs-lzo-text-scanner.cc
index 88ae295..8af89f2 100644
--- a/be/src/exec/hdfs-lzo-text-scanner.cc
+++ b/be/src/exec/hdfs-lzo-text-scanner.cc
@@ -62,6 +62,7 @@ HdfsScanner* HdfsLzoTextScanner::GetHdfsLzoTextScanner(
 
 Status HdfsLzoTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
+  DCHECK(!files.empty());
   if (LzoIssueInitialRanges == NULL) {
     lock_guard<SpinLock> l(lzo_load_lock_);
     if (library_load_status_.ok()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index bb3d091..0188f08 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -69,6 +69,7 @@ const string PARQUET_MEM_LIMIT_EXCEEDED =
 
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
+  DCHECK(!files.empty());
   vector<ScanRange*> footer_ranges;
   for (int i = 0; i < files.size(); ++i) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
@@ -1439,8 +1440,10 @@ Status HdfsParquetScanner::ProcessFooter() {
         BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
 
     unique_ptr<BufferDescriptor> io_buffer;
-    RETURN_IF_ERROR(
-        io_mgr->AddScanRange(scan_node_->reader_context(), metadata_range, 
true));
+    bool needs_buffers;
+    RETURN_IF_ERROR(io_mgr->StartScanRange(
+          scan_node_->reader_context(), metadata_range, &needs_buffers));
+    DCHECK(!needs_buffers) << "Already provided a buffer";
     RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer));
     DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
     DCHECK_EQ(io_buffer->len(), metadata_size);
@@ -1735,12 +1738,18 @@ Status HdfsParquetScanner::InitScalarColumns(
   }
   DCHECK_EQ(col_ranges.size(), num_scalar_readers);
 
-  // Issue all the column chunks to the io mgr and have them scheduled 
immediately.
-  // This means these ranges aren't returned via DiskIoMgr::GetNextRange and
-  // instead are scheduled to be read immediately.
-  RETURN_IF_ERROR(scan_node_->runtime_state()->io_mgr()->AddScanRanges(
-      scan_node_->reader_context(), col_ranges, true));
-
+  DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();
+  // Issue all the column chunks to the IoMgr. We scan through all columns at 
the same
+  // time so need to read from all of them concurrently.
+  for (ScanRange* col_range : col_ranges) {
+    bool needs_buffers;
+    RETURN_IF_ERROR(io_mgr->StartScanRange(
+        scan_node_->reader_context(), col_range, &needs_buffers));
+    if (needs_buffers) {
+      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+          scan_node_->reader_context(), col_range, 3 * 
io_mgr->max_buffer_size()));
+    }
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index f0043b5..1fc3239 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -69,8 +69,8 @@ class BoolColumnReader;
 /// the split size, the mid point guarantees that we have at least 50% of the 
row group in
 /// the current split. ProcessSplit() then computes the column ranges for 
these row groups
 /// and submits them to the IoMgr for immediate scheduling (so they don't 
surface in
-/// DiskIoMgr::GetNextRange()). Scheduling them immediately also guarantees 
they are all
-/// read at once.
+/// DiskIoMgr::GetNextUnstartedRange()). Scheduling them immediately also 
guarantees they
+/// are all read at once.
 ///
 /// Like the other scanners, each parquet scanner object is one to one with a
 /// ScannerContext. Unlike the other scanners though, the context will have 
multiple
@@ -328,7 +328,7 @@ class HdfsParquetScanner : public HdfsScanner {
   virtual ~HdfsParquetScanner() {}
 
   /// Issue just the footer range for each file.  We'll then parse the footer 
and pick
-  /// out the columns we want.
+  /// out the columns we want. 'files' must not be empty.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
                                    const std::vector<HdfsFileDesc*>& files)
                                    WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 0660b9b..861d5dc 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -449,18 +449,27 @@ Status 
HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
     }
   }
 
-  // Issue initial ranges for all file types.
-  RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::PARQUET]));
-  RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::TEXT]));
-  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::SEQUENCE_FILE]));
-  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::RC_FILE]));
-  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
-      matching_per_type_files[THdfsFileFormat::AVRO]));
-
+  // Issue initial ranges for all file types. Only call functions for file 
types that
+  // actually exist - trying to add empty lists of ranges can result in 
spurious
+  // CANCELLED errors - see IMPALA-6564.
+  for (const auto& entry : matching_per_type_files) {
+    if (entry.second.empty()) continue;
+    switch (entry.first) {
+      case THdfsFileFormat::PARQUET:
+        RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, 
entry.second));
+        break;
+      case THdfsFileFormat::TEXT:
+        RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this, 
entry.second));
+        break;
+      case THdfsFileFormat::SEQUENCE_FILE:
+      case THdfsFileFormat::RC_FILE:
+      case THdfsFileFormat::AVRO:
+        RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, 
entry.second));
+        break;
+      default:
+        DCHECK(false) << "Unexpected file type " << entry.first;
+    }
+  }
   return Status::OK();
 }
 
@@ -519,6 +528,7 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, 
const char* file,
 
 Status HdfsScanNodeBase::AddDiskIoRanges(
     const vector<ScanRange*>& ranges, int num_files_queued) {
+  DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges 
finished.";
   
RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), 
ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 7ea4d80..be75677 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -26,6 +26,7 @@
 
 #include "gen-cpp/PlanNodes_types.h"
 
+using namespace impala::io;
 using std::stringstream;
 
 namespace impala {
@@ -76,13 +77,19 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, 
RowBatch* row_batch, bool* e
       scanner_->Close(row_batch);
       scanner_.reset();
     }
-    RETURN_IF_ERROR(
-        runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), 
&scan_range_));
-    if (scan_range_ == NULL) {
+    DiskIoMgr* io_mgr = runtime_state_->io_mgr();
+    bool needs_buffers;
+    RETURN_IF_ERROR(io_mgr->GetNextUnstartedRange(
+        reader_context_.get(), &scan_range_, &needs_buffers));
+    if (scan_range_ == nullptr) {
       *eos = true;
       StopAndFinalizeCounters();
       return Status::OK();
     }
+    if (needs_buffers) {
+      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(), 
scan_range_,
+          3 * io_mgr->max_buffer_size()));
+    }
     ScanRangeMetadata* metadata =
         static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
     int64_t partition_id = metadata->partition_id;

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index b32a743..f9d71e9 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -179,10 +179,10 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
   if (per_type_files_[THdfsFileFormat::PARQUET].size() > 0) {
     // Parquet files require buffers per column
     scanner_thread_bytes_required_ =
-        materialized_slots_.size() * 3 * 
runtime_state_->io_mgr()->max_read_buffer_size();
+        materialized_slots_.size() * 3 * 
runtime_state_->io_mgr()->max_buffer_size();
   } else {
     scanner_thread_bytes_required_ =
-        3 * runtime_state_->io_mgr()->max_read_buffer_size();
+        3 * runtime_state_->io_mgr()->max_buffer_size();
   }
   // scanner_thread_bytes_required_ now contains the IoBuffer requirement.
   // Next we add in the other memory the scanner thread will use.
@@ -376,6 +376,7 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
 void HdfsScanNode::ScannerThread() {
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
+  DiskIoMgr* io_mgr = runtime_state_->io_mgr();
 
   // Make thread-local copy of filter contexts to prune scan ranges, and to 
pass to the
   // scanner for finer-grained filtering. Use a thread-local MemPool for the 
filter
@@ -422,21 +423,28 @@ void HdfsScanNode::ScannerThread() {
     // to return if there's an error.
     ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused);
 
-    ScanRange* scan_range;
-    // Take a snapshot of num_unqueued_files_ before calling GetNextRange().
+    // Take a snapshot of num_unqueued_files_ before calling 
GetNextUnstartedRange().
     // We don't want num_unqueued_files_ to go to zero between the return from
-    // GetNextRange() and the check for when all ranges are complete.
+    // GetNextUnstartedRange() and the check for when all ranges are complete.
     int num_unqueued_files = num_unqueued_files_.Load();
     // TODO: the Load() acts as an acquire barrier.  Is this needed? (i.e. any 
earlier
     // stores that need to complete?)
     AtomicUtil::MemoryBarrier();
+    ScanRange* scan_range;
+    bool needs_buffers;
     Status status =
-        runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), 
&scan_range);
+        io_mgr->GetNextUnstartedRange(reader_context_.get(), &scan_range, 
&needs_buffers);
 
-    if (status.ok() && scan_range != NULL) {
-      // Got a scan range. Process the range end to end (in this thread).
-      status = ProcessSplit(filter_status.ok() ? filter_ctxs : 
vector<FilterContext>(),
-          &expr_results_pool, scan_range);
+    if (status.ok() && scan_range != nullptr) {
+      if (needs_buffers) {
+        status = io_mgr->AllocateBuffersForRange(
+            reader_context_.get(), scan_range, 3 * io_mgr->max_buffer_size());
+      }
+      if (status.ok()) {
+        // Got a scan range. Process the range end to end (in this thread).
+        status = ProcessSplit(filter_status.ok() ? filter_ctxs : 
vector<FilterContext>(),
+            &expr_results_pool, scan_range);
+      }
     }
 
     if (!status.ok()) {
@@ -466,8 +474,8 @@ void HdfsScanNode::ScannerThread() {
       // TODO: Based on the usage pattern of all_ranges_started_, it looks 
like it is not
       // needed to acquire the lock in x86.
       unique_lock<mutex> l(lock_);
-      // All ranges have been queued and GetNextRange() returned NULL. This 
means that
-      // every range is either done or being processed by another thread.
+      // All ranges have been queued and GetNextUnstartedRange() returned 
NULL. This means
+      // that every range is either done or being processed by another thread.
       all_ranges_started_ = true;
       break;
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 79e7a85..0abf82f 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -101,6 +101,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool 
done) {
 
 Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
   DCHECK_EQ(0, io_buffer_bytes_left_);
+  DiskIoMgr* io_mgr = parent_->state_->io_mgr();
   if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
   if (io_buffer_ != nullptr) ReturnIoBuffer();
 
@@ -121,7 +122,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
 
     int64_t read_past_buffer_size = 0;
-    int64_t max_buffer_size = 
parent_->state_->io_mgr()->max_read_buffer_size();
+    int64_t max_buffer_size = io_mgr->max_buffer_size();
     if (!read_past_size_cb_.empty()) read_past_buffer_size = 
read_past_size_cb_(offset);
     if (read_past_buffer_size <= 0) {
       // Either no callback was set or the callback did not return an 
estimate. Use
@@ -143,8 +144,16 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
     ScanRange* range = parent_->scan_node_->AllocateScanRange(
         scan_range_->fs(), filename(), read_past_buffer_size, offset, 
partition_id,
         scan_range_->disk_id(), false, BufferOpts::Uncached());
-    RETURN_IF_ERROR(parent_->state_->io_mgr()->AddScanRange(
-        parent_->scan_node_->reader_context(), range, true));
+    bool needs_buffers;
+    RETURN_IF_ERROR(io_mgr->StartScanRange(
+        parent_->scan_node_->reader_context(), range, &needs_buffers));
+    if (needs_buffers) {
+      // Allocate fresh buffers. The buffers for 'scan_range_' should be 
released now
+      // since we hit EOS.
+      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+          parent_->scan_node_->reader_context(), range,
+          3 * io_mgr->max_buffer_size()));
+    }
     RETURN_IF_ERROR(range->GetNext(&io_buffer_));
     DCHECK(io_buffer_->eosr());
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h 
b/be/src/runtime/bufferpool/buffer-pool.h
index 285aacb..d14da63 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -37,6 +37,7 @@
 
 namespace impala {
 
+class MemTracker;
 class ReservationTracker;
 class RuntimeProfile;
 class SystemAllocator;

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h 
b/be/src/runtime/io/disk-io-mgr-internal.h
index 3fc3895..2d32487 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -35,9 +35,25 @@
 #include "util/filesystem-util.h"
 #include "util/hdfs-util.h"
 #include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
 
 /// This file contains internal structures shared between submodules of the 
IoMgr. Users
 /// of the IoMgr do not need to include this file.
+
+// Macros to work around counters sometimes not being provided.
+// TODO: fix things so that counters are always non-NULL.
+#define COUNTER_ADD_IF_NOT_NULL(c, v) \
+  do { \
+    ::impala::RuntimeProfile::Counter* __ctr__ = (c); \
+    if (__ctr__ != nullptr) __ctr__->Add(v); \
+ } while (false);
+
+#define COUNTER_BITOR_IF_NOT_NULL(c, v) \
+  do { \
+    ::impala::RuntimeProfile::Counter* __ctr__ = (c); \
+    if (__ctr__ != nullptr) __ctr__->BitOr(v); \
+ } while (false);
+
 namespace impala {
 namespace io {
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/io/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc 
b/be/src/runtime/io/disk-io-mgr-stress-test.cc
index 45b36ed..0e41a6f 100644
--- a/be/src/runtime/io/disk-io-mgr-stress-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc
@@ -16,7 +16,10 @@
 // under the License.
 
 #include "runtime/io/disk-io-mgr-stress.h"
-#include "util/cpu-info.h"
+
+#include "common/init.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
 #include "util/string-parser.h"
 
 #include "common/names.h"
@@ -35,10 +38,10 @@ const int NUM_CLIENTS = 10;
 const bool TEST_CANCELLATION = true;
 
 int main(int argc, char** argv) {
-  google::InitGoogleLogging(argv[0]);
-  CpuInfo::Init();
-  OsInfo::Init();
-  impala::InitThreading();
+  InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
+  InitFeSupport();
+  TestEnv test_env;
+  ABORT_IF_ERROR(test_env.Init());
   int duration_sec = DEFAULT_DURATION_SEC;
 
   if (argc == 2) {

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc 
b/be/src/runtime/io/disk-io-mgr-stress.cc
index cfe71ab..ba1ad92 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -38,6 +38,9 @@ static const int MAX_FILE_LEN = 1024;
 static const int MIN_READ_BUFFER_SIZE = 64;
 static const int MAX_READ_BUFFER_SIZE = 128;
 
+// Maximum bytes to allocate per scan range.
+static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3;
+
 static const int CANCEL_READER_PERIOD_MS = 20;  // in ms
 
 static void CreateTempFile(const char* filename, const char* data) {
@@ -110,9 +113,16 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
     while (!eos) {
       ScanRange* range;
-      Status status = io_mgr_->GetNextRange(client->reader.get(), &range);
+      bool needs_buffers;
+      Status status =
+          io_mgr_->GetNextUnstartedRange(client->reader.get(), &range, 
&needs_buffers);
       CHECK(status.ok() || status.IsCancelled());
       if (range == NULL) break;
+      if (needs_buffers) {
+        status = io_mgr_->AllocateBuffersForRange(
+            client->reader.get(), range, MAX_BUFFER_BYTES_PER_SCAN_RANGE);
+        CHECK(status.ok()) << status.GetDetail();
+      }
 
       while (true) {
         unique_ptr<BufferDescriptor> buffer;

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc 
b/be/src/runtime/io/disk-io-mgr-test.cc
index e099285..95ea184 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -41,7 +41,7 @@ DECLARE_int32(num_remote_hdfs_io_threads);
 DECLARE_int32(num_s3_io_threads);
 DECLARE_int32(num_adls_io_threads);
 
-const int MIN_BUFFER_SIZE = 512;
+const int MIN_BUFFER_SIZE = 128;
 const int MAX_BUFFER_SIZE = 1024;
 const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024;
 
@@ -122,7 +122,12 @@ class DiskIoMgrTest : public testing::Test {
   static void ValidateSyncRead(DiskIoMgr* io_mgr, RequestContext* reader,
       ScanRange* range, const char* expected, int expected_len = -1) {
     unique_ptr<BufferDescriptor> buffer;
-    ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
+    bool needs_buffers;
+    ASSERT_OK(io_mgr->StartScanRange(reader, range, &needs_buffers));
+    if (needs_buffers) {
+      ASSERT_OK(io_mgr->AllocateBuffersForRange(
+          reader, range, io_mgr->max_buffer_size()));
+    }
     ASSERT_OK(range->GetNext(&buffer));
     ASSERT_TRUE(buffer != nullptr);
     EXPECT_EQ(buffer->len(), range->len());
@@ -161,9 +166,14 @@ class DiskIoMgrTest : public testing::Test {
     int num_ranges = 0;
     while (max_ranges == 0 || num_ranges < max_ranges) {
       ScanRange* range;
-      Status status = io_mgr->GetNextRange(reader, &range);
+      bool needs_buffers;
+      Status status = io_mgr->GetNextUnstartedRange(reader, &range, 
&needs_buffers);
       ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
       if (range == nullptr) break;
+      if (needs_buffers) {
+        ASSERT_OK(io_mgr->AllocateBuffersForRange(
+            reader, range, io_mgr->max_buffer_size() * 3));
+      }
       ValidateScanRange(io_mgr, range, expected_result, expected_len, 
expected_status);
       num_ranges_processed->Add(1);
       ++num_ranges;
@@ -509,7 +519,6 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
       // Issue some reads before the async ones are issued
       ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
       ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
-
       vector<ScanRange*> ranges;
       for (int i = 0; i < len; ++i) {
         int disk_id = i % num_disks;
@@ -641,15 +650,23 @@ TEST_F(DiskIoMgrTest, MemLimits) {
 
     bool hit_mem_limit_exceeded = false;
     char result[strlen(data) + 1];
-    // Keep reading new ranges without returning buffers. This forces us
-    // to go over the limit eventually.
+    // Keep starting new ranges without returning buffers. This forces us to 
go over
+    // the limit eventually.
     while (true) {
       memset(result, 0, strlen(data) + 1);
       ScanRange* range = nullptr;
-      Status status = io_mgr.GetNextRange(reader.get(), &range);
+      bool needs_buffers;
+      Status status = io_mgr.GetNextUnstartedRange(reader.get(), &range, 
&needs_buffers);
       ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
       hit_mem_limit_exceeded |= status.IsMemLimitExceeded();
       if (range == nullptr) break;
+      DCHECK(needs_buffers);
+      status = io_mgr.AllocateBuffersForRange(reader.get(), range, 
MAX_BUFFER_SIZE * 3);
+      ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
+      if (status.IsMemLimitExceeded()) {
+        hit_mem_limit_exceeded = true;
+        continue;
+      }
 
       while (true) {
         unique_ptr<BufferDescriptor> buffer;
@@ -925,34 +942,140 @@ TEST_F(DiskIoMgrTest, PartialRead) {
   const char* data = "the quick brown fox jumped over the lazy dog";
   int len = strlen(data);
   int read_len = len + 1000; // Read past end of file.
+  // Test with various buffer sizes to exercise different code paths, e.g.
+  // * the truncated data ends exactly on a buffer boundary
+  // * the data is split between many buffers
+  // * the data fits in one buffer
+  const int64_t MIN_BUFFER_SIZE = 2;
+  vector<int64_t> max_buffer_sizes{4, 16, 32, 128, 1024, 4096};
   CreateTempFile(tmp_file, data);
 
   // Get mtime for file
   struct stat stat_val;
   stat(tmp_file, &stat_val);
 
-  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
+  for (int64_t max_buffer_size : max_buffer_sizes) {
+    DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, max_buffer_size);
 
-  ASSERT_OK(io_mgr->Init());
+    ASSERT_OK(io_mgr.Init());
+    MemTracker reader_mem_tracker;
+    unique_ptr<RequestContext> reader;
+    reader = io_mgr.RegisterContext(&reader_mem_tracker);
+
+    // We should not read past the end of file.
+    ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
+    unique_ptr<BufferDescriptor> buffer;
+    bool needs_buffers;
+    ASSERT_OK(io_mgr.StartScanRange(reader.get(), range, &needs_buffers));
+    if (needs_buffers) {
+      ASSERT_OK(io_mgr.AllocateBuffersForRange(reader.get(), range, 3 * 
max_buffer_size));
+    }
+
+    int64_t bytes_read = 0;
+    bool eosr = false;
+    do {
+      ASSERT_OK(range->GetNext(&buffer));
+      ASSERT_GE(buffer->buffer_len(), MIN_BUFFER_SIZE);
+      ASSERT_LE(buffer->buffer_len(), max_buffer_size);
+      ASSERT_LE(buffer->len(), len - bytes_read);
+      ASSERT_TRUE(memcmp(buffer->buffer(), data + bytes_read, buffer->len()) 
== 0);
+      bytes_read += buffer->len();
+      eosr = buffer->eosr();
+      // Should see eosr if we've read past the end of the file. If the data 
is an exact
+      // multiple of the max buffer size then we may read to the end of the 
file without
+      // noticing that it is eosr. Eosr will be returned on the next read in 
that case.
+      ASSERT_TRUE(bytes_read < len || buffer->eosr()
+          || (buffer->len() == max_buffer_size && len % max_buffer_size == 0))
+          << "max_buffer_size " << max_buffer_size << " bytes_read " << 
bytes_read
+          << "len " << len << " buffer->len() " << buffer->len()
+          << " buffer->buffer_len() " << buffer->buffer_len();
+      ASSERT_TRUE(buffer->len() > 0 || buffer->eosr());
+      range->ReturnBuffer(move(buffer));
+    } while (!eosr);
+
+    io_mgr.UnregisterContext(reader.get());
+    EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+    EXPECT_EQ(mem_tracker.consumption(), 0);
+    pool_.Clear();
+  }
+}
+
+// Test zero-length scan range.
+TEST_F(DiskIoMgrTest, ZeroLengthScanRange) {
+  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+  const char* data = "the quick brown fox jumped over the lazy dog";
+  const int64_t MIN_BUFFER_SIZE = 2;
+  const int64_t MAX_BUFFER_SIZE = 1024;
+  CreateTempFile(tmp_file, data);
+
+  // Get mtime for file
+  struct stat stat_val;
+  stat(tmp_file, &stat_val);
+
+  DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+
+  ASSERT_OK(io_mgr.Init());
   MemTracker reader_mem_tracker;
   unique_ptr<RequestContext> reader;
-  reader = io_mgr->RegisterContext(&reader_mem_tracker);
+  reader = io_mgr.RegisterContext(&reader_mem_tracker);
 
   // We should not read past the end of file.
-  ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
-  unique_ptr<BufferDescriptor> buffer;
-  ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
-  ASSERT_OK(range->GetNext(&buffer));
-  ASSERT_TRUE(buffer->eosr());
-  ASSERT_EQ(len, buffer->len());
-  ASSERT_TRUE(memcmp(buffer->buffer(), data, len) == 0);
-  range->ReturnBuffer(move(buffer));
+  ScanRange* range = InitRange(tmp_file, 0, 0, 0, stat_val.st_mtime);
+  bool needs_buffers;
+  Status status = io_mgr.StartScanRange(reader.get(), range, &needs_buffers);
+  ASSERT_EQ(TErrorCode::DISK_IO_ERROR, status.code());
 
-  io_mgr->UnregisterContext(reader.get());
-  pool_.Clear();
-  io_mgr.reset();
-  EXPECT_EQ(reader_mem_tracker.consumption(), 0);
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  status = io_mgr.AddScanRanges(reader.get(), vector<ScanRange*>({range}));
+  ASSERT_EQ(TErrorCode::DISK_IO_ERROR, status.code());
+
+  io_mgr.UnregisterContext(reader.get());
+}
+
+// Test what happens if don't call AllocateBuffersForRange() after trying to 
start a
+// range.
+TEST_F(DiskIoMgrTest, SkipAllocateBuffers) {
+  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+  const char* data = "the quick brown fox jumped over the lazy dog";
+  int len = strlen(data);
+  const int64_t MIN_BUFFER_SIZE = 2;
+  const int64_t MAX_BUFFER_SIZE = 1024;
+  CreateTempFile(tmp_file, data);
+
+  // Get mtime for file
+  struct stat stat_val;
+  stat(tmp_file, &stat_val);
+
+  DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+
+  ASSERT_OK(io_mgr.Init());
+  MemTracker reader_mem_tracker;
+  unique_ptr<RequestContext> reader;
+  reader = io_mgr.RegisterContext(&reader_mem_tracker);
+
+  // We should not read past the end of file.
+  vector<ScanRange*> ranges;
+  for (int i = 0; i < 4; ++i) {
+    ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime));
+  }
+  bool needs_buffers;
+  // Test StartScanRange().
+  ASSERT_OK(io_mgr.StartScanRange(reader.get(), ranges[0], &needs_buffers));
+  EXPECT_TRUE(needs_buffers);
+  ASSERT_OK(io_mgr.StartScanRange(reader.get(), ranges[1], &needs_buffers));
+  EXPECT_TRUE(needs_buffers);
+
+  // Test AddScanRanges()/GetNextUnstartedRange().
+  ASSERT_OK(
+      io_mgr.AddScanRanges(reader.get(), vector<ScanRange*>({ranges[2], 
ranges[3]})));
+
+  // Cancel two directly, cancel the other two indirectly via the context.
+  ranges[0]->Cancel(Status::CANCELLED);
+  ranges[2]->Cancel(Status::CANCELLED);
+  reader->Cancel();
+
+  io_mgr.UnregisterContext(reader.get());
 }
 
 // Test reading into a client-allocated buffer.
@@ -978,7 +1101,9 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
     ScanRange* range = AllocateRange();
     range->Reset(nullptr, tmp_file, scan_len, 0, 0, true,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len));
-    ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
+    bool needs_buffers;
+    ASSERT_OK(io_mgr->StartScanRange(reader.get(), range, &needs_buffers));
+    ASSERT_FALSE(needs_buffers);
 
     unique_ptr<BufferDescriptor> io_buffer;
     ASSERT_OK(range->GetNext(&io_buffer));
@@ -1016,7 +1141,9 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
     ScanRange* range = AllocateRange();
     range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true,
         BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
-    ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
+    bool needs_buffers;
+    ASSERT_OK(io_mgr->StartScanRange(reader.get(), range, &needs_buffers));
+    ASSERT_FALSE(needs_buffers);
 
     /// Also test the cancellation path. Run multiple iterations since it is 
racy whether
     /// the read fails before the cancellation.
@@ -1053,6 +1180,61 @@ TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
   ASSERT_TRUE(num_io_threads ==
       num_io_threads_per_rotational_or_ssd + num_io_threads_for_remote_disks);
 }
+
+// Test to verify that the correct buffer sizes are chosen given different
+// of scan range lengths and max_bytes values.
+TEST_F(DiskIoMgrTest, BufferSizeSelection) {
+  DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+  ASSERT_OK(io_mgr.Init());
+
+  // Scan range doesn't fit in max_bytes - allocate as many max-sized buffers 
as possible.
+  EXPECT_EQ(vector<int64_t>(3, MAX_BUFFER_SIZE),
+      io_mgr.ChooseBufferSizes(10 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE));
+  EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE}),
+      io_mgr.ChooseBufferSizes(10 * MAX_BUFFER_SIZE, MAX_BUFFER_SIZE));
+  EXPECT_EQ(vector<int64_t>(4, MAX_BUFFER_SIZE),
+      io_mgr.ChooseBufferSizes(10 * MAX_BUFFER_SIZE, 4 * MAX_BUFFER_SIZE));
+
+  // Scan range fits in max_bytes - allocate as many max-sized buffers as 
possible, then
+  // a smaller buffer to fit the remainder.
+  EXPECT_EQ(vector<int64_t>(2, MAX_BUFFER_SIZE),
+      io_mgr.ChooseBufferSizes(2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE));
+  EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE, MAX_BUFFER_SIZE, 
MIN_BUFFER_SIZE}),
+      io_mgr.ChooseBufferSizes(2 * MAX_BUFFER_SIZE + 1, 3 * MAX_BUFFER_SIZE));
+  EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE, MAX_BUFFER_SIZE, 2 * 
MIN_BUFFER_SIZE}),
+      io_mgr.ChooseBufferSizes(
+        2 * MAX_BUFFER_SIZE + MIN_BUFFER_SIZE + 1, 3 * MAX_BUFFER_SIZE));
+  EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE, MAX_BUFFER_SIZE, 2 * 
MIN_BUFFER_SIZE}),
+      io_mgr.ChooseBufferSizes(
+        2 * MAX_BUFFER_SIZE + 2 * MIN_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE));
+
+  // Scan range is smaller than max buffer size - allocate a single buffer 
that fits
+  // the range.
+  EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE}),
+      io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE - 1, 3 * MAX_BUFFER_SIZE));
+  EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE / 2}),
+      io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE - 1, MAX_BUFFER_SIZE / 2));
+  EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE / 2}),
+      io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE / 2 - 1, 3 * MAX_BUFFER_SIZE));
+  EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE / 2}),
+      io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE / 2- 1, MAX_BUFFER_SIZE / 2));
+  EXPECT_EQ(vector<int64_t>({MIN_BUFFER_SIZE}),
+      io_mgr.ChooseBufferSizes(MIN_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE));
+  EXPECT_EQ(vector<int64_t>({MIN_BUFFER_SIZE}),
+      io_mgr.ChooseBufferSizes(MIN_BUFFER_SIZE, MIN_BUFFER_SIZE));
+
+  // Scan range is smaller than max buffer size and max bytes is smaller still 
-
+  // should allocate a single smaller buffer.
+  EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE / 4}),
+      io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE / 2, MAX_BUFFER_SIZE / 2 - 1));
+
+  // Non power-of-two size > max buffer size.
+  EXPECT_EQ(vector<int64_t>({MAX_BUFFER_SIZE, MIN_BUFFER_SIZE}),
+      io_mgr.ChooseBufferSizes(MAX_BUFFER_SIZE + 7, 3 * MAX_BUFFER_SIZE));
+  // Non power-of-two size < min buffer size.
+  EXPECT_EQ(vector<int64_t>({MIN_BUFFER_SIZE}),
+      io_mgr.ChooseBufferSizes(MIN_BUFFER_SIZE - 7, 3 * MAX_BUFFER_SIZE));
+}
 }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 8c00ef8..6dda447 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -124,13 +124,6 @@ DEFINE_uint64(unused_file_handle_timeout_sec, 270, 
"Maximum time, in seconds, th
 DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used 
by the "
     "file handle cache.");
 
-// The IoMgr is able to run with a wide range of memory usage. If a query has 
memory
-// remaining less than this value, the IoMgr will stop all buffering 
regardless of the
-// current queue size.
-static const int LOW_MEMORY = 64 * 1024 * 1024;
-
-const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
-
 AtomicInt32 DiskIoMgr::next_disk_id_;
 
 namespace detail {
@@ -197,8 +190,8 @@ DiskIoMgr::DiskIoMgr() :
     num_io_threads_per_solid_state_disk_(GetFirstPositiveVal(
         FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk,
         THREADS_PER_SOLID_STATE_DISK)),
-    max_buffer_size_(FLAGS_read_size),
-    min_buffer_size_(FLAGS_min_buffer_size),
+    max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(FLAGS_read_size)),
+    min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(FLAGS_min_buffer_size)),
     shut_down_(false),
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
@@ -223,8 +216,8 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int 
threads_per_rotational_disk,
     int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t 
max_buffer_size) :
     num_io_threads_per_rotational_disk_(threads_per_rotational_disk),
     num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk),
-    max_buffer_size_(max_buffer_size),
-    min_buffer_size_(min_buffer_size),
+    max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(max_buffer_size)),
+    min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(min_buffer_size)),
     shut_down_(false),
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
@@ -335,80 +328,90 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
     return Status(TErrorCode::DISK_IO_ERROR,
         Substitute("Invalid scan range. Negative offset $0", range->offset_));
   }
-  if (range->len_ < 0) {
+  if (range->len_ <= 0) {
     return Status(TErrorCode::DISK_IO_ERROR,
-        Substitute("Invalid scan range. Negative length $0", range->len_));
+        Substitute("Invalid scan range. Non-positive length $0", range->len_));
   }
   return Status::OK();
 }
 
-Status DiskIoMgr::AddScanRanges(RequestContext* reader,
-    const vector<ScanRange*>& ranges, bool schedule_immediately) {
-  if (ranges.empty()) return Status::OK();
-
+Status DiskIoMgr::AddScanRanges(
+    RequestContext* reader, const vector<ScanRange*>& ranges) {
   // Validate and initialize all ranges
   for (int i = 0; i < ranges.size(); ++i) {
     RETURN_IF_ERROR(ValidateScanRange(ranges[i]));
     ranges[i]->InitInternal(this, reader);
   }
 
-  // disks that this reader needs to be scheduled on.
   unique_lock<mutex> reader_lock(reader->lock_);
   DCHECK(reader->Validate()) << endl << reader->DebugString();
 
   if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
 
   // Add each range to the queue of the disk the range is on
-  for (int i = 0; i < ranges.size(); ++i) {
+  for (ScanRange* range : ranges) {
     // Don't add empty ranges.
-    DCHECK_NE(ranges[i]->len(), 0);
-    ScanRange* range = ranges[i];
-
+    DCHECK_NE(range->len(), 0);
+    reader->AddActiveScanRangeLocked(reader_lock, range);
     if (range->try_cache_) {
-      if (schedule_immediately) {
-        bool cached_read_succeeded;
-        RETURN_IF_ERROR(range->ReadFromCache(reader_lock, 
&cached_read_succeeded));
-        if (cached_read_succeeded) continue;
-        // Cached read failed, fall back to AddRequestRange() below.
-      } else {
-        reader->cached_ranges_.Enqueue(range);
-        continue;
-      }
+      reader->cached_ranges_.Enqueue(range);
+    } else {
+      reader->AddRangeToDisk(reader_lock, range, ScheduleMode::UPON_GETNEXT);
     }
-    reader->AddRequestRange(reader_lock, range, schedule_immediately);
   }
   DCHECK(reader->Validate()) << endl << reader->DebugString();
-
   return Status::OK();
 }
 
-Status DiskIoMgr::AddScanRange(
-    RequestContext* reader, ScanRange* range, bool schedule_immediately) {
-  return AddScanRanges(reader, vector<ScanRange*>({range}), 
schedule_immediately);
+Status DiskIoMgr::StartScanRange(RequestContext* reader, ScanRange* range,
+    bool* needs_buffers) {
+  RETURN_IF_ERROR(ValidateScanRange(range));
+  range->InitInternal(this, reader);
+
+  unique_lock<mutex> reader_lock(reader->lock_);
+  DCHECK(reader->Validate()) << endl << reader->DebugString();
+  if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
+
+  DCHECK_NE(range->len(), 0);
+  if (range->try_cache_) {
+    bool cached_read_succeeded;
+    RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded));
+    if (cached_read_succeeded) {
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+      *needs_buffers = false;
+      return Status::OK();
+    }
+    // Cached read failed, fall back to normal read path.
+  }
+  // If we don't have a buffer yet, the caller must allocate buffers for the 
range.
+  *needs_buffers = range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER;
+  if (*needs_buffers) range->SetBlockedOnBuffer();
+  reader->AddActiveScanRangeLocked(reader_lock, range);
+  reader->AddRangeToDisk(reader_lock, range,
+      *needs_buffers ? ScheduleMode::BY_CALLER : ScheduleMode::IMMEDIATELY);
+  DCHECK(reader->Validate()) << endl << reader->DebugString();
+  return Status::OK();
 }
 
 // This function returns the next scan range the reader should work on, 
checking
 // for eos and error cases. If there isn't already a cached scan range or a 
scan
 // range prepared by the disk threads, the caller waits on the disk threads.
-Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
+Status DiskIoMgr::GetNextUnstartedRange(RequestContext* reader, ScanRange** 
range,
+    bool* needs_buffers) {
   DCHECK(reader != nullptr);
   DCHECK(range != nullptr);
   *range = nullptr;
-  Status status = Status::OK();
+  *needs_buffers = false;
 
   unique_lock<mutex> reader_lock(reader->lock_);
   DCHECK(reader->Validate()) << endl << reader->DebugString();
-
   while (true) {
-    if (reader->state_ == RequestContext::Cancelled) {
-      status = Status::CANCELLED;
-      break;
-    }
+    if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
 
     if (reader->num_unstarted_scan_ranges_.Load() == 0 &&
         reader->ready_to_start_ranges_.empty() && 
reader->cached_ranges_.empty()) {
       // All ranges are done, just return.
-      break;
+      return Status::OK();
     }
 
     if (!reader->cached_ranges_.empty()) {
@@ -420,7 +423,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, 
ScanRange** range) {
       if (cached_read_succeeded) return Status::OK();
 
       // This range ended up not being cached. Loop again and pick up a new 
range.
-      reader->AddRequestRange(reader_lock, *range, false);
+      reader->AddRangeToDisk(reader_lock, *range, ScheduleMode::UPON_GETNEXT);
       DCHECK(reader->Validate()) << endl << reader->DebugString();
       *range = nullptr;
       continue;
@@ -436,13 +439,81 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, 
ScanRange** range) {
       // Set this to nullptr, the next time this disk runs for this reader, it 
will
       // get another range ready.
       reader->disk_states_[disk_id].set_next_scan_range_to_start(nullptr);
-      reader->ScheduleScanRange(reader_lock, *range);
-      break;
+      ScanRange::ExternalBufferTag buffer_tag = (*range)->external_buffer_tag_;
+      if (buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER) {
+        // We can't schedule this range until the client gives us buffers. The 
context
+        // must be rescheduled regardless to ensure that 
'next_scan_range_to_start' is
+        // refilled.
+        reader->disk_states_[disk_id].ScheduleContext(reader_lock, reader, 
disk_id);
+        (*range)->SetBlockedOnBuffer();
+        *needs_buffers = true;
+      } else {
+        reader->ScheduleScanRange(reader_lock, *range);
+      }
+      return Status::OK();
+    }
+  }
+}
+
+Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader, ScanRange* 
range,
+    int64_t max_bytes) {
+  DCHECK_GE(max_bytes, min_buffer_size_);
+  DCHECK(range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER)
+     << static_cast<int>(range->external_buffer_tag_) << " invalid to allocate 
buffers "
+     << "when already reading into an external buffer";
+
+  Status status;
+  vector<unique_ptr<BufferDescriptor>> buffers;
+  for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) {
+    if (!reader->mem_tracker_->TryConsume(buffer_size)) {
+      status = reader->mem_tracker_->MemLimitExceeded(nullptr,
+          "Failed to allocate I/O buffer", buffer_size);
+      goto error;
     }
+    uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(buffer_size));
+    if (buffer == nullptr) {
+      reader->mem_tracker_->Release(buffer_size);
+      status = Status(Substitute("Failed to malloc $0-byte I/O buffer", 
buffer_size));
+      goto error;
+    }
+    buffers.emplace_back(new BufferDescriptor(this, reader, range, buffer, 
buffer_size));
   }
+  range->AddUnusedBuffers(move(buffers), false);
+  return Status::OK();
+ error:
+  DCHECK(!status.ok());
+  range->CleanUpBuffers(move(buffers));
   return status;
 }
 
+vector<int64_t> DiskIoMgr::ChooseBufferSizes(int64_t scan_range_len, int64_t 
max_bytes) {
+  DCHECK_GE(max_bytes, min_buffer_size_);
+  vector<int64_t> buffer_sizes;
+  int64_t bytes_allocated = 0;
+  while (bytes_allocated < scan_range_len) {
+    int64_t bytes_remaining = scan_range_len - bytes_allocated;
+    // Either allocate a max-sized buffer or a smaller buffer to fit the rest 
of the
+    // range.
+    int64_t next_buffer_size;
+    if (bytes_remaining >= max_buffer_size_) {
+      next_buffer_size = max_buffer_size_;
+    } else {
+      next_buffer_size =
+          max(min_buffer_size_, BitUtil::RoundUpToPowerOfTwo(bytes_remaining));
+    }
+    if (next_buffer_size + bytes_allocated > max_bytes) {
+      // Can't allocate the desired buffer size. Make sure to allocate at 
least one
+      // buffer.
+      if (bytes_allocated > 0) break;
+      next_buffer_size = BitUtil::RoundDownToPowerOfTwo(max_bytes);
+    }
+    DCHECK(BitUtil::IsPowerOf2(next_buffer_size)) << next_buffer_size;
+    buffer_sizes.push_back(next_buffer_size);
+    bytes_allocated += next_buffer_size;
+  }
+  return buffer_sizes;
+}
+
 // This function gets the next RequestRange to work on for this disk. It 
checks for
 // cancellation and
 // a) Updates ready_to_start_ranges if there are no scan ranges queued for 
this disk.
@@ -505,16 +576,16 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* 
disk_queue, RequestRange** range,
         !request_disk_state->unstarted_scan_ranges()->empty()) {
       // We don't have a range queued for this disk for what the caller should
       // read next. Populate that.  We want to have one range waiting to 
minimize
-      // wait time in GetNextRange.
+      // wait time in GetNextUnstartedRange().
       ScanRange* new_range = 
request_disk_state->unstarted_scan_ranges()->Dequeue();
       (*request_context)->num_unstarted_scan_ranges_.Add(-1);
       (*request_context)->ready_to_start_ranges_.Enqueue(new_range);
       request_disk_state->set_next_scan_range_to_start(new_range);
 
       if ((*request_context)->num_unstarted_scan_ranges_.Load() == 0) {
-        // All the ranges have been started, notify everyone blocked on 
GetNextRange.
-        // Only one of them will get work so make sure to return nullptr to 
the other
-        // caller threads.
+        // All the ranges have been started, notify everyone blocked on
+        // GetNextUnstartedRange(). Only one of them will get work so make 
sure to return
+        // nullptr to the other caller threads.
         (*request_context)->ready_to_start_ranges_cv_.NotifyAll();
       } else {
         (*request_context)->ready_to_start_ranges_cv_.NotifyOne();
@@ -584,8 +655,8 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, 
RequestContext* reader
   DCHECK(buffer->buffer_ != nullptr);
   DCHECK(!buffer->is_cached()) << "HDFS cache reads don't go through this code 
path.";
 
-  // After calling EnqueueBuffer() below, it is no longer valid to read from 
buffer.
-  // Store the state we need before calling EnqueueBuffer().
+  // After calling EnqueueReadyBuffer() below, it is no longer valid to read 
from buffer.
+  // Store the state we need before calling EnqueueReadyBuffer().
   bool eosr = buffer->eosr_;
 
   // TODO: IMPALA-4249: it safe to touch 'scan_range' until 
DecrementDiskThread() is
@@ -596,23 +667,18 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, 
RequestContext* reader
   if (read_status.ok() && reader->state_ != RequestContext::Cancelled) {
     DCHECK_EQ(reader->state_, RequestContext::Active);
     // Read successfully - update the reader's scan ranges.  There are two 
cases here:
-    //  1. End of scan range
-    //  2. Middle of scan range
-    bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer));
-    if (!eosr) {
-      if (queue_full) {
-        reader->blocked_ranges_.Enqueue(scan_range);
-      } else {
-        reader->ScheduleScanRange(reader_lock, scan_range);
-      }
-    }
+    //  1. End of scan range or cancelled scan range - don't need to 
reschedule.
+    //  2. Middle of scan range - need to schedule to read next buffer.
+    bool enqueued = scan_range->EnqueueReadyBuffer(reader_lock, move(buffer));
+    if (!eosr && enqueued) reader->ScheduleScanRange(reader_lock, scan_range);
   } else {
     // The scan range will be cancelled, either because we hit an error or 
because the
     // request context was cancelled.  The buffer is not needed - we must free 
it.
     reader->FreeBuffer(buffer.get());
-    reader->num_used_buffers_.Add(-1);
-    // Propagate the error or cancellation by cancelling the scan range.
-    scan_range->Cancel(read_status.ok() ? Status::CANCELLED : read_status);
+    // Propagate 'read_status' to the scan range. If we are here because the 
context
+    // was cancelled, the scan range is already cancelled so we do not need to 
re-cancel
+    // it.
+    if (!read_status.ok()) scan_range->CancelFromReader(reader_lock, 
read_status);
     scan_range_done = true;
   }
   if (scan_range_done) {
@@ -636,14 +702,12 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
   //   3. Perform the read or write as specified.
   // Cancellation checking needs to happen in both steps 1 and 3.
   while (true) {
-    RequestContext* worker_context = nullptr;;
+    RequestContext* worker_context = nullptr;
     RequestRange* range = nullptr;
-
     if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
       DCHECK(shut_down_);
-      break;
+      return;
     }
-
     if (range->request_type() == RequestType::READ) {
       ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range));
     } else {
@@ -651,12 +715,8 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
       Write(worker_context, static_cast<WriteRange*>(range));
     }
   }
-
-  DCHECK(shut_down_);
 }
 
-// This function reads the specified scan range associated with the
-// specified reader context and disk queue.
 void DiskIoMgr::ReadRange(
     DiskQueue* disk_queue, RequestContext* reader, ScanRange* range) {
   int64_t bytes_remaining = range->len_ - range->bytes_read_;
@@ -669,25 +729,23 @@ void DiskIoMgr::ReadRange(
     DCHECK(range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER)
         << "This code path does not handle other buffer types, i.e. HDFS cache"
         << static_cast<int>(range->external_buffer_tag_);
-    // Need to allocate a buffer to read into.
-    int64_t buffer_size = min(bytes_remaining, max_buffer_size_);
-    buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, 
buffer_size);
-    if (buffer_desc == nullptr) return;
+    buffer_desc = range->GetNextUnusedBufferForRange();
+    if (buffer_desc == nullptr) {
+      // No buffer available - the range will be rescheduled when a buffer is 
added.
+      unique_lock<mutex> reader_lock(reader->lock_);
+      
reader->disk_states_[disk_queue->disk_id].DecrementDiskThread(reader_lock, 
reader);
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+      return;
+    }
   }
-  reader->num_used_buffers_.Add(1);
 
   // No locks in this section.  Only working on local vars.  We don't want to 
hold a
   // lock across the read call.
   Status read_status = range->Open(detail::is_file_handle_caching_enabled());
   if (read_status.ok()) {
     // Update counters.
-    if (reader->active_read_thread_counter_) {
-      reader->active_read_thread_counter_->Add(1L);
-    }
-    if (reader->disks_accessed_bitmap_) {
-      int64_t disk_bit = 1LL << disk_queue->disk_id;
-      reader->disks_accessed_bitmap_->BitOr(disk_bit);
-    }
+    COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, 1L);
+    COUNTER_BITOR_IF_NOT_NULL(reader->disks_accessed_bitmap_, 1LL << 
disk_queue->disk_id);
     SCOPED_TIMER(&read_timer_);
     SCOPED_TIMER(reader->read_timer_);
 
@@ -695,69 +753,15 @@ void DiskIoMgr::ReadRange(
         &buffer_desc->len_, &buffer_desc->eosr_);
     buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
 
-    if (reader->bytes_read_counter_ != nullptr) {
-      COUNTER_ADD(reader->bytes_read_counter_, buffer_desc->len_);
-    }
-
+    COUNTER_ADD_IF_NOT_NULL(reader->bytes_read_counter_, buffer_desc->len_);
     COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_);
-    if (reader->active_read_thread_counter_) {
-      reader->active_read_thread_counter_->Add(-1L);
-    }
+    COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, -1L);
   }
 
   // Finished read, update reader/disk based on the results
   HandleReadFinished(disk_queue, reader, read_status, move(buffer_desc));
 }
 
-unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange(
-    DiskQueue* disk_queue, RequestContext* reader, ScanRange* range,
-    int64_t buffer_size) {
-  DCHECK(reader->mem_tracker_ != nullptr);
-  // TODO: replace this with reservation check (if needed at all).
-  bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
-
-  RequestContext::PerDiskState* disk_state = 
&reader->disk_states_[disk_queue->disk_id];
-  if (!enough_memory) {
-    unique_lock<mutex> reader_lock(reader->lock_);
-
-    // Just grabbed the reader lock, check for cancellation.
-    if (reader->state_ == RequestContext::Cancelled) {
-      DCHECK(reader->Validate()) << endl << reader->DebugString();
-      disk_state->DecrementDiskThread(reader_lock, reader);
-      range->Cancel(Status::CANCELLED);
-      DCHECK(reader->Validate()) << endl << reader->DebugString();
-      return nullptr;
-    }
-
-    if (!range->ready_buffers_.empty()) {
-      // We have memory pressure and this range doesn't need another buffer
-      // (it already has one queued). Skip this range and pick it up later.
-      range->blocked_on_queue_ = true;
-      reader->blocked_ranges_.Enqueue(range);
-      disk_state->DecrementDiskThread(reader_lock, reader);
-      return nullptr;
-    } else {
-      // We need to get a buffer anyway since there are none queued. The query
-      // is likely to fail due to mem limits but there's nothing we can do 
about that
-      // now.
-    }
-  }
-  unique_ptr<BufferDescriptor> buffer_desc;
-  Status status = reader->AllocBuffer(range, buffer_size, &buffer_desc);
-  if (!status.ok()) {
-    // Hit memory limit - cancel range.
-    range->Cancel(status);
-    {
-      unique_lock<mutex> reader_lock(reader->lock_);
-      disk_state->DecrementDiskThread(reader_lock, reader);
-      DCHECK(reader->Validate()) << endl << reader->DebugString();
-    }
-    return nullptr;
-  }
-  DCHECK(buffer_desc != nullptr);
-  return buffer_desc;
-}
-
 void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) 
{
   Status ret_status = Status::OK();
   FILE* file_handle = nullptr;
@@ -809,17 +813,14 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, 
WriteRange* write_range) {
         Substitute("fwrite(buffer, 1, $0, $1) failed with errno=$2 
description=$3",
         write_range->len_, write_range->file_, errno, GetStrErrMsg())));
   }
-  if (ImpaladMetrics::IO_MGR_BYTES_WRITTEN != nullptr) {
-    ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
-  }
-
+  ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
   return Status::OK();
 }
 
 Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* 
write_range) {
   unique_lock<mutex> writer_lock(writer->lock_);
   if (writer->state_ == RequestContext::Cancelled) return Status::CANCELLED;
-  writer->AddRequestRange(writer_lock, write_range, false);
+  writer->AddRangeToDisk(writer_lock, write_range, ScheduleMode::IMMEDIATELY);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 20f5d44..d429d1d 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -48,15 +48,15 @@ namespace io {
 /// Manager object that schedules IO for all queries on all disks and remote 
filesystems
 /// (such as S3). Each query maps to one or more RequestContext objects, each 
of which
 /// has its own queue of scan ranges and/or write ranges.
-//
+///
 /// The API splits up requesting scan/write ranges (non-blocking) and reading 
the data
 /// (blocking). The DiskIoMgr has worker threads that will read from and write 
to
 /// disk/hdfs/remote-filesystems, allowing interleaving of IO and CPU. This 
allows us to
 /// keep all disks and all cores as busy as possible.
-//
+///
 /// All public APIs are thread-safe. It is not valid to call any of the APIs 
after
 /// UnregisterContext() returns.
-//
+///
 /// For Readers:
 /// We can model this problem as a multiple producer (threads for each disk), 
multiple
 /// consumer (scan ranges) problem. There are multiple queues that need to be
@@ -68,84 +68,101 @@ namespace io {
 /// Readers map to scan nodes. The reader then contains a queue of scan 
ranges. The caller
 /// asks the IoMgr for the next range to process. The IoMgr then selects the 
best range
 /// to read based on disk activity and begins reading and queuing buffers for 
that range.
-/// TODO: We should map readers to queries. A reader is the unit of scheduling 
and queries
-/// that have multiple scan nodes shouldn't have more 'turns'.
-//
+///
 /// For Writers:
 /// Data is written via AddWriteRange(). This is non-blocking and adds a 
WriteRange to a
 /// per-disk queue. After the write is complete, a callback in WriteRange is 
invoked.
 /// No memory is allocated within IoMgr for writes and no copies are made. It 
is the
 /// responsibility of the client to ensure that the data to be written is 
valid and that
 /// the file to be written to exists until the callback is invoked.
-//
-/// The IoMgr provides three key APIs.
-///  1. AddScanRanges: this is non-blocking and tells the IoMgr all the ranges 
that
-///     will eventually need to be read.
-///  2. GetNextRange: returns to the caller the next scan range it should 
process.
-///     This is based on disk load. This also begins reading the data in this 
scan
-///     range. This is blocking.
-///  3. ScanRange::GetNext: returns the next buffer for this range.  This is 
blocking.
-//
+///
+/// There are several key methods for scanning data with the IoMgr.
+///  1. StartScanRange(): adds range to the IoMgr to start immediately.
+///  2. AddScanRanges(): adds ranges to the IoMgr that the reader wants to 
scan, but does
+///     not start them until GetNextUnstartedRange() is called.
+///  3. GetNextUnstartedRange(): returns to the caller the next scan range it 
should
+///     process.
+///  4. ScanRange::GetNext(): returns the next buffer for this range, blocking 
until
+///     data is available.
+///
 /// The disk threads do not synchronize with each other. The readers and 
writers don't
 /// synchronize with each other. There is a lock and condition variable for 
each request
 /// context queue and each disk queue.
 /// IMPORTANT: whenever both locks are needed, the lock order is to grab the 
context lock
 /// before the disk lock.
-//
+///
 /// Scheduling: If there are multiple request contexts with work for a single 
disk, the
 /// request contexts are scheduled in round-robin order. Multiple disk threads 
can
 /// operate on the same request context. Exactly one request range is 
processed by a
-/// disk thread at a time. If there are multiple scan ranges scheduled via
-/// GetNextRange() for a single context, these are processed in round-robin 
order.
+/// disk thread at a time. If there are multiple scan ranges scheduled for a 
single
+/// context, these are processed in round-robin order.
 /// If there are multiple scan and write ranges for a disk, a read is always 
followed
 /// by a write, and a write is followed by a read, i.e. reads and writes 
alternate.
 /// If multiple write ranges are enqueued for a single disk, they will be 
processed
 /// by the disk threads in order, but may complete in any order. No guarantees 
are made
 /// on ordering of writes across disks.
-//
-/// Resource Management: effective resource management in the IoMgr is key to 
good
-/// performance. The IoMgr helps coordinate two resources: CPU and disk. For 
CPU,
-/// spinning up too many threads causes thrashing.
-/// Memory usage in the IoMgr comes from queued read buffers.  If we queue the 
minimum
-/// (i.e. 1), then the disks are idle while we are processing the buffer. If 
we don't
-/// limit the queue, then it possible we end up queueing the entire data set 
(i.e. CPU
-/// is slower than disks) and run out of memory.
-/// For both CPU and memory, we want to model the machine as having a fixed 
amount of
-/// resources.  If a single query is running, it should saturate either CPU or 
Disk
-/// as well as using as little memory as possible. With multiple queries, each 
query
-/// should get less CPU. In that case each query will need fewer queued 
buffers and
-/// therefore have less memory usage.
-//
-/// The IoMgr defers CPU management to the caller. The IoMgr provides a 
GetNextRange
-/// API which will return the next scan range the caller should process. The 
caller
-/// can call this from the desired number of reading threads. Once a scan range
-/// has been returned via GetNextRange, the IoMgr will start to buffer reads 
for
-/// that range and it is expected the caller will pull those buffers promptly. 
For
-/// example, if the caller would like to have 1 scanner thread, the read loop
-/// would look like:
+///
+/// Resource Management: the IoMgr is designed to share the available disk I/O 
capacity
+/// between many clients and to help use the available I/O capacity 
efficiently. The IoMgr
+/// interfaces are designed to let clients manage their own CPU and memory 
usage while the
+/// IoMgr manages the allocation of the I/O capacity of different I/O devices 
to scan
+/// ranges of different clients.
+///
+/// IoMgr clients may want to work on multiple scan ranges at a time to 
maximize CPU and
+/// I/O utilization. Clients can call GetNextUnstartedRange() to start as many 
concurrent
+/// scan ranges as required, e.g. from each parallel scanner thread. Once a 
scan range has
+/// been returned via GetNextUnstartedRange(), the caller must allocate any 
memory needed
+/// for buffering reads, after which the IoMgr wil start to fill the buffers 
with data
+/// while the caller concurrently consumes and processes the data. For 
example, the logic
+/// in a scanner thread might look like:
 ///   while (more_ranges)
-///     range = GetNextRange()
+///     range = GetNextUnstartedRange()
 ///     while (!range.eosr)
 ///       buffer = range.GetNext()
-/// To have multiple reading threads, the caller would simply spin up the 
threads
-/// and each would process the loops above.
-//
-/// To control the number of IO buffers, each scan range has a limit of two 
queued
-/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at 
capacity,
-/// the IoMgr will no longer read for that scan range until the caller has 
processed
-/// a buffer. Assuming the client returns each buffer before requesting the 
next one
-/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O 
buffers per
-/// scan range.
-//
+///
+/// Note that the IoMgr rather than the client is responsible for choosing 
which scan
+/// range to process next, which allows optimizations like distributing load 
across disks.
+///
 /// Buffer Management:
-/// Buffers for reads are either a) allocated by the IoMgr and transferred to 
the caller,
-/// b) cached HDFS buffers if the scan range uses HDFS caching, or c) provided 
by the
-/// caller when constructing the scan range.
+/// Buffers for reads are either a) allocated on behalf of the caller with
+/// AllocateBuffersForRange() ("IoMgr-allocated"), b) cached HDFS buffers if 
the scan
+/// range was read from the HDFS cache, or c) a client buffer, large enough to 
fit the
+/// whole scan range's data, that is provided by the caller when constructing 
the
+/// scan range.
+///
+/// All three kinds of buffers are wrapped in BufferDescriptors before 
returning to the
+/// caller. The caller must always call ReturnBuffer() on the buffer 
descriptor to allow
+/// recycling of the buffer memory and to release any resources associated 
with the buffer
+/// or scan range.
 ///
-/// As a caller reads from a scan range, these buffers are wrapped in 
BufferDescriptors
-/// and returned to the caller. The caller must always call ReturnBuffer() on 
the buffer
-/// descriptor to allow freeing of the associated buffer (if there is an 
IoMgr-allocated
-/// or HDFS cached buffer).
+/// In case a), ReturnBuffer() may re-enqueue the buffer for GetNext() to 
return again if
+/// needed. E.g. if 24MB of buffers were allocated to read a 64MB scan range, 
each buffer
+/// must be returned multiple times. Callers must be careful to call 
ReturnBuffer() with
+/// the previous buffer returned from the range before calling before 
GetNext() so that
+/// at least one buffer is available for the I/O mgr to read data into. 
Calling GetNext()
+/// when the scan range has no buffers to read data into causes a resource 
deadlock.
+/// NB: if the scan range was allocated N buffers, then it's always ok for the 
caller
+/// to hold onto N - 1 buffers, but currently the IoMgr doesn't give the 
caller a way
+/// to determine the value of N.
+///
+/// If the caller wants to maximize I/O throughput, it can give the range 
enough memory
+/// for 3 max-sized buffers per scan range. Having two queued buffers (plus 
the buffer
+/// that is currently being processed by the client) gives good performance in 
most
+/// scenarios:
+/// 1. If the consumer is consuming data faster than we can read from disk, 
then the
+///    queue will be empty most of the time because the buffer will be 
immediately
+///    pulled off the queue as soon as it is added. There will always be an 
I/O request
+///    in the disk queue to maximize I/O throughput, which is the bottleneck 
in this
+///    case.
+/// 2. If we can read from disk faster than the consumer is consuming data, 
the queue
+///    will fill up and there will always be a buffer available for the 
consumer to
+///    read, so the consumer will not block and we maximize consumer 
throughput, which
+///    is the bottleneck in this case.
+/// 3. If the consumer is consuming data at approximately the same rate as we 
are
+///    reading from disk, then the steady state is that the consumer is 
processing one
+///    buffer and one buffer is in the disk queue. The additional buffer can 
absorb
+///    bursts where the producer runs faster than the consumer or the consumer 
runs
+///    faster than the producer without blocking either the producer or 
consumer.
 ///
 /// Caching support:
 /// Scan ranges contain metadata on whether or not it is cached on the DN. In 
that
@@ -161,13 +178,13 @@ namespace io {
 ///   - HDFS will time us out if we hold onto the mlock for too long
 ///   - Holding the lock prevents uncaching this file due to a caching policy 
change.
 /// Therefore, we only issue the cached read when the caller is ready to 
process the
-/// range (GetNextRange()) instead of when the ranges are issued. This 
guarantees that
-/// there will be a CPU available to process the buffer and any throttling we 
do with
+/// range (GetNextUnstartedRange()) instead of when the ranges are issued. 
This guarantees
+/// that there will be a CPU available to process the buffer and any 
throttling we do with
 /// the number of scanner threads properly controls the amount of files we 
mlock.
 /// With cached scan ranges, we cannot close the scan range until the cached 
buffer
 /// is returned (HDFS does not allow this). We therefore need to defer the 
close until
 /// the cached buffer is returned (ReturnBuffer()).
-//
+///
 /// Remote filesystem support (e.g. S3):
 /// Remote filesystems are modeled as "remote disks". That is, there is a 
seperate disk
 /// queue for each supported remote filesystem type. In order to maximize 
throughput,
@@ -176,12 +193,13 @@ namespace io {
 /// intensive than local disk/hdfs because of non-direct I/O and SSL 
processing, and can
 /// be CPU bottlenecked especially if not enough I/O threads for these queues 
are
 /// started.
-//
+///
+/// TODO: We should implement more sophisticated resource management. 
Currently readers
+/// are the unit of scheduling and we attempt to distribute IOPS between them. 
Instead
+/// it would be better to have policies based on queries, resource pools, etc.
 /// TODO: IoMgr should be able to request additional scan ranges from the 
coordinator
 /// to help deal with stragglers.
-/// TODO: look into using a lock free queue
-/// TODO: simplify the common path (less locking, memory allocations).
-//
+///
 /// Structure of the Implementation:
 ///  - All client APIs are defined in this file, request-ranges.h and 
request-context.h.
 ///    Clients can include only the files that they need.
@@ -204,8 +222,10 @@ class DiskIoMgr : public CacheLineAligned {
   ///    disk. This is also the max queue depth.
   ///  - threads_per_solid_state_disk: number of read threads to create per 
solid state
   ///    disk. This is also the max queue depth.
-  ///  - min_buffer_size: minimum io buffer size (in bytes)
-  ///  - max_buffer_size: maximum io buffer size (in bytes). Also the max read 
size.
+  ///  - min_buffer_size: minimum io buffer size (in bytes). Will be rounded 
down to the
+  //     nearest power-of-two.
+  ///  - max_buffer_size: maximum io buffer size (in bytes). Will be rounded 
up to the
+  ///    nearest power-of-two. Also the max read size.
   DiskIoMgr(int num_disks, int threads_per_rotational_disk,
       int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t 
max_buffer_size);
 
@@ -237,29 +257,61 @@ class DiskIoMgr : public CacheLineAligned {
   /// up.
   void UnregisterContext(RequestContext* context);
 
-  /// Adds the scan ranges to the queues. This call is non-blocking. The 
caller must
-  /// not deallocate the scan range pointers before UnregisterContext().
-  /// If schedule_immediately, the ranges are immediately put on the read queue
-  /// (i.e. the caller should not/cannot call GetNextRange for these ranges).
-  /// This can be used to do synchronous reads as well as schedule dependent 
ranges,
-  /// as in the case for columnar formats.
-  Status AddScanRanges(RequestContext* reader,
-      const std::vector<ScanRange*>& ranges,
-      bool schedule_immediately = false) WARN_UNUSED_RESULT;
-  Status AddScanRange(RequestContext* reader, ScanRange* range,
-      bool schedule_immediately = false) WARN_UNUSED_RESULT;
+  /// Adds the scan ranges to reader's queues, but does not start scheduling 
it. The range
+  /// can be scheduled by a thread calling GetNextUnstartedRange(). This call 
is
+  /// non-blocking. The caller must not deallocate the scan range pointers 
before
+  /// UnregisterContext().
+  Status AddScanRanges(
+      RequestContext* reader, const std::vector<ScanRange*>& ranges) 
WARN_UNUSED_RESULT;
+
+  /// Adds the scan range to the queues, as with AddScanRanges(), but 
immediately
+  /// start scheduling the scan range. This can be used to do synchronous 
reads as well
+  /// as schedule dependent ranges, e.g. for columnar formats. This call is 
non-blocking.
+  /// The caller must not deallocate the scan range pointers before 
UnregisterContext().
+  ///
+  /// If this returns true in '*needs_buffers', the caller must then call
+  /// AllocateBuffersForRange() to add buffers for the data to be read into 
before the
+  /// range can be scheduled. Otherwise, the range is scheduled and the IoMgr 
will
+  /// asynchronously read the data for the range and the caller can call
+  /// ScanRange::GetNext() to read the data.
+  Status StartScanRange(
+      RequestContext* reader, ScanRange* range, bool* needs_buffers) 
WARN_UNUSED_RESULT;
 
   /// Add a WriteRange for the writer. This is non-blocking and schedules the 
context
   /// on the IoMgr disk queue. Does not create any files.
   Status AddWriteRange(
       RequestContext* writer, WriteRange* write_range) WARN_UNUSED_RESULT;
 
-  /// Returns the next unstarted scan range for this reader. When the range is 
returned,
-  /// the disk threads in the IoMgr will already have started reading from it. 
The
-  /// caller is expected to call ScanRange::GetNext on the returned range.
-  /// If there are no more unstarted ranges, nullptr is returned.
-  /// This call is blocking.
-  Status GetNextRange(RequestContext* reader, ScanRange** range) 
WARN_UNUSED_RESULT;
+  /// Tries to get an unstarted scan range that was added to 'reader' with
+  /// AddScanRanges(). On success, returns OK and returns the range in 
'*range'.
+  /// If 'reader' was cancelled, returns CANCELLED. If another error is 
encountered,
+  /// an error status is returned. Otherwise, if error or cancellation wasn't 
encountered
+  /// and there are no unstarted ranges for 'reader', returns OK and sets 
'*range' to
+  /// nullptr.
+  ///
+  /// If '*needs_buffers' is returned as true, the caller must call
+  /// AllocateBuffersForRange() to add buffers for the data to be read into 
before the
+  /// range can be scheduled. Otherwise, the range is scheduled and the IoMgr 
will
+  /// asynchronously read the data for the range and the caller can call
+  /// ScanRange::GetNext() to read the data.
+  Status GetNextUnstartedRange(RequestContext* reader, ScanRange** range,
+      bool* needs_buffers) WARN_UNUSED_RESULT;
+
+  /// Allocates up to 'max_bytes' buffers to read the data from 'range' into 
and schedules
+  /// the range. Called after StartScanRange() or GetNextUnstartedRange() 
returns
+  /// *needs_buffers=true.
+  ///
+  /// The buffer sizes are chosen based on range->len(). 'max_bytes' must be >=
+  /// min_read_buffer_size() so that at least one buffer can be allocated. 
Returns ok
+  /// if the buffers were successfully allocated and the range was scheduled. 
Fails with
+  /// MEM_LIMIT_EXCEEDED if the buffers could not be allocated. On failure, 
any allocated
+  /// buffers are freed and the state of 'range' is unmodified so that 
allocation can be
+  /// retried.  Setting 'max_bytes' to 3 * max_buffer_size() will typically 
maximize I/O
+  /// throughput. See Buffer management" section of the class comment for 
explanation.
+  /// TODO: error handling contract will change with reservations. The caller 
needs to
+  /// to guarantee that there is sufficient reservation.
+  Status AllocateBuffersForRange(RequestContext* reader, ScanRange* range,
+      int64_t max_bytes);
 
   /// Determine which disk queue this file should be assigned to.  Returns an 
index into
   /// disk_queues_.  The disk_id is the volume ID for the local disk that 
holds the
@@ -267,8 +319,8 @@ class DiskIoMgr : public CacheLineAligned {
   /// co-located with the datanode for this file.
   int AssignQueue(const char* file, int disk_id, bool expected_local);
 
-  /// Returns the maximum read buffer size
-  int64_t max_read_buffer_size() const { return max_buffer_size_; }
+  int64_t min_buffer_size() const { return min_buffer_size_; }
+  int64_t max_buffer_size() const { return max_buffer_size_; }
 
   /// Returns the total number of disk queues (both local and remote).
   int num_total_disks() const { return disk_queues_.size(); }
@@ -318,25 +370,6 @@ class DiskIoMgr : public CacheLineAligned {
   Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, 
int64_t mtime,
       CachedHdfsFileHandle** fid);
 
-  /// The maximum number of ready buffers that can be queued in a scan range. 
Having two
-  /// queued buffers (plus the buffer that is returned to the client) gives 
good
-  /// performance in most scenarios:
-  /// 1. If the consumer is consuming data faster than we can read from disk, 
then the
-  ///    queue will be empty most of the time because the buffer will be 
immediately
-  ///    pulled off the queue as soon as it is added. There will always be an 
I/O request
-  ///    in the disk queue to maximize I/O throughput, which is the bottleneck 
in this
-  ///    case.
-  /// 2. If we can read from disk faster than the consumer is consuming data, 
the queue
-  ///    will fill up and there will always be a buffer available for the 
consumer to
-  ///    read, so the consumer will not block and we maximize consumer 
throughput, which
-  ///    is the bottleneck in this case.
-  /// 3. If the consumer is consuming data at approximately the same rate as 
we are
-  ///    reading from disk, then the steady state is that the consumer is 
processing one
-  ///    buffer and one buffer is in the disk queue. The additional buffer can 
absorb
-  ///    bursts where the producer runs faster than the consumer or the 
consumer runs
-  ///    faster than the producer without blocking either the producer or 
consumer.
-  static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2;
-
   /// "Disk" queue offsets for remote accesses.  Offset 0 corresponds to
   /// disk ID (i.e. disk_queue_ index) of num_local_disks().
   enum {
@@ -354,6 +387,7 @@ class DiskIoMgr : public CacheLineAligned {
   struct DiskQueue;
 
   friend class DiskIoMgrTest_Buffers_Test;
+  friend class DiskIoMgrTest_BufferSizeSelection_Test;
   friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
 
   /// Number of worker(read) threads per rotational disk. Also the max depth 
of queued
@@ -442,17 +476,14 @@ class DiskIoMgr : public CacheLineAligned {
   /// Does not open or close the file that is written.
   Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) 
WARN_UNUSED_RESULT;
 
-  /// Reads the specified scan range and calls HandleReadFinished() when done.
+  /// Reads the specified scan range and calls HandleReadFinished() when done. 
If no
+  /// buffer is available to read the range's data into, the read cannot 
proceed, the
+  /// range becomes blocked and this function returns without doing I/O.
   void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* 
range);
 
-  /// Try to allocate the next buffer for the scan range, returning the new 
buffer
-  /// if successful. If 'reader' is cancelled, cancels the range and returns 
nullptr.
-  /// If there is memory pressure and buffers are already queued, adds the 
range
-  /// to the blocked ranges and returns nullptr. If buffers are not queued and 
no more
-  /// buffers can be allocated, cancels the range with a MEM_LIMIT_EXCEEDED 
error and
-  /// returns nullptr.
-  std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* 
disk_queue,
-      RequestContext* reader, ScanRange* range, int64_t buffer_size);
+  /// Helper for AllocateBuffersForRange() to compute the buffer sizes for a 
scan range
+  /// with length 'scan_range_len', given that 'max_bytes' of memory should be 
allocated.
+  std::vector<int64_t> ChooseBufferSizes(int64_t scan_range_len, int64_t 
max_bytes);
 };
 }
 }

Reply via email to