IMPALA-4835 (prep only): create io subfolder and namespace

Instead of using the DiskIoMgr class as a namespace, which prevents
forward-declaration of inner classes, create an impala::io namespace
and unnested the inner class.

This is done in anticipation of DiskIoMgr depending on BufferPool. This
helps avoid a circular dependency between DiskIoMgr, TmpFileMgr and
BufferPool headers that could not be broken with forward declarations.

Testing:
Ran core tests.

Change-Id: If807f93a47d8027a43e56dd80b1b535d0bb74e1b
Reviewed-on: http://gerrit.cloudera.org:8080/8424
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b840137c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b840137c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b840137c

Branch: refs/heads/master
Commit: b840137c940d71af5cec2daf482b523a38b6a9f1
Parents: 2510fe0
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Mon Oct 30 16:34:47 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Fri Nov 17 22:47:34 2017 +0000

----------------------------------------------------------------------
 be/CMakeLists.txt                               |    2 +
 be/src/exec/base-sequence-scanner.cc            |    9 +-
 be/src/exec/hdfs-parquet-scanner.cc             |   36 +-
 be/src/exec/hdfs-parquet-scanner.h              |    4 +-
 be/src/exec/hdfs-scan-node-base.cc              |   25 +-
 be/src/exec/hdfs-scan-node-base.h               |   26 +-
 be/src/exec/hdfs-scan-node-mt.h                 |    2 +-
 be/src/exec/hdfs-scan-node.cc                   |    7 +-
 be/src/exec/hdfs-scan-node.h                    |    6 +-
 be/src/exec/hdfs-text-scanner.cc                |    9 +-
 be/src/exec/kudu-scan-node.cc                   |    2 +-
 be/src/exec/scanner-context.cc                  |   12 +-
 be/src/exec/scanner-context.h                   |   14 +-
 be/src/runtime/CMakeLists.txt                   |   10 +-
 be/src/runtime/disk-io-mgr-handle-cache.h       |  196 ---
 .../runtime/disk-io-mgr-handle-cache.inline.h   |  231 ----
 be/src/runtime/disk-io-mgr-internal.h           |   76 --
 be/src/runtime/disk-io-mgr-reader-context.cc    |  292 -----
 be/src/runtime/disk-io-mgr-reader-context.h     |  406 ------
 be/src/runtime/disk-io-mgr-scan-range.cc        |  591 ---------
 be/src/runtime/disk-io-mgr-stress-test.cc       |   60 -
 be/src/runtime/disk-io-mgr-stress.cc            |  246 ----
 be/src/runtime/disk-io-mgr-stress.h             |   94 --
 be/src/runtime/disk-io-mgr-test.cc              | 1127 -----------------
 be/src/runtime/disk-io-mgr.cc                   | 1190 -----------------
 be/src/runtime/disk-io-mgr.h                    |  972 --------------
 be/src/runtime/exec-env.cc                      |    4 +-
 be/src/runtime/exec-env.h                       |    9 +-
 be/src/runtime/io/CMakeLists.txt                |   36 +
 be/src/runtime/io/disk-io-mgr-internal.h        |   78 ++
 be/src/runtime/io/disk-io-mgr-stress-test.cc    |   61 +
 be/src/runtime/io/disk-io-mgr-stress.cc         |  247 ++++
 be/src/runtime/io/disk-io-mgr-stress.h          |   95 ++
 be/src/runtime/io/disk-io-mgr-test.cc           | 1129 +++++++++++++++++
 be/src/runtime/io/disk-io-mgr.cc                | 1191 ++++++++++++++++++
 be/src/runtime/io/disk-io-mgr.h                 |  550 ++++++++
 be/src/runtime/io/handle-cache.h                |  197 +++
 be/src/runtime/io/handle-cache.inline.h         |  232 ++++
 be/src/runtime/io/request-context.cc            |  293 +++++
 be/src/runtime/io/request-context.h             |  403 ++++++
 be/src/runtime/io/request-ranges.h              |  471 +++++++
 be/src/runtime/io/scan-range.cc                 |  593 +++++++++
 be/src/runtime/row-batch.h                      |    2 +-
 be/src/runtime/runtime-state.cc                 |    2 +-
 be/src/runtime/runtime-state.h                  |    7 +-
 be/src/runtime/test-env.h                       |    2 +-
 be/src/runtime/tmp-file-mgr-test.cc             |   10 +-
 be/src/runtime/tmp-file-mgr.cc                  |   20 +-
 be/src/runtime/tmp-file-mgr.h                   |   20 +-
 49 files changed, 5702 insertions(+), 5595 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index bf7aa26..163567a 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -361,6 +361,7 @@ set (IMPALA_LINK_LIBS
   GlobalFlags
   histogram_proto
   ImpalaThrift
+  Io
   kudu_util
   krpc
   Rpc
@@ -386,6 +387,7 @@ set (IMPALA_LINK_LIBS
 if (BUILD_SHARED_LIBS)
   set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS}
     BufferPool
+    Io
     Runtime
     Exec
     CodeGen

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc 
b/be/src/exec/base-sequence-scanner.cc
index fcf58c6..7f20e31 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -32,6 +32,7 @@
 #include "common/names.h"
 
 using namespace impala;
+using namespace impala::io;
 
 const int BaseSequenceScanner::HEADER_SIZE = 1024;
 const int BaseSequenceScanner::SYNC_MARKER = -1;
@@ -48,7 +49,7 @@ Status 
BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
   // Issue just the header range for each file.  When the header is complete,
   // we'll issue the splits for that file.  Splits cannot be processed until 
the
   // header is parsed (the header object is then shared across splits for that 
file).
-  vector<DiskIoMgr::ScanRange*> header_ranges;
+  vector<ScanRange*> header_ranges;
   for (int i = 0; i < files.size(); ++i) {
     ScanRangeMetadata* metadata =
         static_cast<ScanRangeMetadata*>(files[i]->splits[0]->meta_data());
@@ -57,9 +58,9 @@ Status 
BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     // it is not cached.
     // TODO: add remote disk id and plumb that through to the io mgr.  It 
should have
     // 1 queue for each NIC as well?
-    DiskIoMgr::ScanRange* header_range = 
scan_node->AllocateScanRange(files[i]->fs,
+    ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
         files[i]->filename.c_str(), header_size, 0, metadata->partition_id, 
-1, false,
-        DiskIoMgr::BufferOpts::Uncached());
+        BufferOpts::Uncached());
     header_ranges.push_back(header_range);
   }
   // Issue the header ranges only. GetNextInternal() will issue the files' 
scan ranges
@@ -310,7 +311,7 @@ void BaseSequenceScanner::CloseFileRanges(const char* 
filename) {
   DCHECK(only_parsing_header_);
   HdfsFileDesc* desc = scan_node_->GetFileDesc(
       context_->partition_descriptor()->id(), filename);
-  const vector<DiskIoMgr::ScanRange*>& splits = desc->splits;
+  const vector<ScanRange*>& splits = desc->splits;
   for (int i = 0; i < splits.size(); ++i) {
     COUNTER_ADD(bytes_skipped_counter_, splits[i]->len());
     scan_node_->RangeComplete(file_format(), THdfsCompression::NONE);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 7fae959..f407877 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -27,6 +27,7 @@
 #include "exec/parquet-column-stats.h"
 #include "exec/scanner-context.inline.h"
 #include "runtime/collection-value-builder.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/runtime-state.h"
 #include "runtime/runtime-filter.inline.h"
 #include "rpc/thrift-util.h"
@@ -35,6 +36,7 @@
 
 using std::move;
 using namespace impala;
+using namespace impala::io;
 
 DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the 
percentage of "
     "rows rejected by a runtime filter drops below this value, the filter is 
disabled.");
@@ -67,7 +69,7 @@ const string PARQUET_MEM_LIMIT_EXCEEDED =
 
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const std::vector<HdfsFileDesc*>& files) {
-  vector<DiskIoMgr::ScanRange*> footer_ranges;
+  vector<ScanRange*> footer_ranges;
   for (int i = 0; i < files.size(); ++i) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
     if (files[i]->file_length < 12) {
@@ -80,10 +82,10 @@ Status 
HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     DCHECK_GE(footer_start, 0);
 
     // Try to find the split with the footer.
-    DiskIoMgr::ScanRange* footer_split = FindFooterSplit(files[i]);
+    ScanRange* footer_split = FindFooterSplit(files[i]);
 
     for (int j = 0; j < files[i]->splits.size(); ++j) {
-      DiskIoMgr::ScanRange* split = files[i]->splits[j];
+      ScanRange* split = files[i]->splits[j];
 
       DCHECK_LE(split->offset() + split->len(), files[i]->file_length);
       // If there are no materialized slots (such as count(*) over the table), 
we can
@@ -98,19 +100,19 @@ Status 
HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         // is done here, followed by scan ranges for the columns of each row 
group within
         // the actual split (in InitColumns()). The original split is stored 
in the
         // metadata associated with the footer range.
-        DiskIoMgr::ScanRange* footer_range;
+        ScanRange* footer_range;
         if (footer_split != NULL) {
           footer_range = scan_node->AllocateScanRange(files[i]->fs,
               files[i]->filename.c_str(), footer_size, footer_start,
               split_metadata->partition_id, footer_split->disk_id(),
               footer_split->expected_local(),
-              DiskIoMgr::BufferOpts(footer_split->try_cache(), 
files[i]->mtime), split);
+              BufferOpts(footer_split->try_cache(), files[i]->mtime), split);
         } else {
           // If we did not find the last split, we know it is going to be a 
remote read.
           footer_range =
               scan_node->AllocateScanRange(files[i]->fs, 
files[i]->filename.c_str(),
                   footer_size, footer_start, split_metadata->partition_id, -1, 
false,
-                  DiskIoMgr::BufferOpts::Uncached(), split);
+                  BufferOpts::Uncached(), split);
         }
 
         footer_ranges.push_back(footer_range);
@@ -125,10 +127,10 @@ Status 
HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
   return Status::OK();
 }
 
-DiskIoMgr::ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
+ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
   DCHECK(file != NULL);
   for (int i = 0; i < file->splits.size(); ++i) {
-    DiskIoMgr::ScanRange* split = file->splits[i];
+    ScanRange* split = file->splits[i];
     if (split->offset() + split->len() == file->file_length) return split;
   }
   return NULL;
@@ -341,7 +343,7 @@ static int64_t GetRowGroupMidOffset(const 
parquet::RowGroup& row_group) {
 
 // Returns true if 'row_group' overlaps with 'split_range'.
 static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
-    const DiskIoMgr::ScanRange* split_range) {
+    const ScanRange* split_range) {
   int64_t row_group_start = 
GetColumnStartOffset(row_group.columns[0].meta_data);
 
   const parquet::ColumnMetaData& last_column =
@@ -598,7 +600,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
 }
 
 Status HdfsParquetScanner::NextRowGroup() {
-  const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>(
+  const ScanRange* split_range = static_cast<ScanRangeMetadata*>(
       metadata_range_->meta_data())->original_split;
   int64_t split_offset = split_range->offset();
   int64_t split_length = split_range->len();
@@ -1377,12 +1379,12 @@ Status HdfsParquetScanner::ProcessFooter() {
     DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();
 
     // Read the header into the metadata buffer.
-    DiskIoMgr::ScanRange* metadata_range = scan_node_->AllocateScanRange(
+    ScanRange* metadata_range = scan_node_->AllocateScanRange(
         metadata_range_->fs(), filename(), metadata_size, metadata_start, 
partition_id,
         metadata_range_->disk_id(), metadata_range_->expected_local(),
-        DiskIoMgr::BufferOpts::ReadInto(metadata_buffer.buffer(), 
metadata_size));
+        BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
 
-    unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer;
+    unique_ptr<BufferDescriptor> io_buffer;
     RETURN_IF_ERROR(
         io_mgr->Read(scan_node_->reader_context(), metadata_range, 
&io_buffer));
     DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
@@ -1589,7 +1591,7 @@ Status HdfsParquetScanner::InitColumns(
   parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
 
   // All the scan ranges (one for each column).
-  vector<DiskIoMgr::ScanRange*> col_ranges;
+  vector<ScanRange*> col_ranges;
   // Used to validate that the number of values in each reader in 
column_readers_ is the
   // same.
   int num_values = -1;
@@ -1656,17 +1658,17 @@ Status HdfsParquetScanner::InitColumns(
           "filename '$1'", col_chunk.file_path, filename()));
     }
 
-    const DiskIoMgr::ScanRange* split_range =
+    const ScanRange* split_range =
         
static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
 
     // Determine if the column is completely contained within a local split.
     bool col_range_local = split_range->expected_local()
         && col_start >= split_range->offset()
         && col_end <= split_range->offset() + split_range->len();
-    DiskIoMgr::ScanRange* col_range = 
scan_node_->AllocateScanRange(metadata_range_->fs(),
+    ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
         filename(), col_len, col_start, partition_id, split_range->disk_id(),
         col_range_local,
-        DiskIoMgr::BufferOpts(split_range->try_cache(), file_desc->mtime));
+        BufferOpts(split_range->try_cache(), file_desc->mtime));
     col_ranges.push_back(col_range);
 
     // Get the stream that will be used for this column

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index e4b6ae7..0eea458 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -442,7 +442,7 @@ class HdfsParquetScanner : public HdfsScanner {
   ParquetFileVersion file_version_;
 
   /// Scan range for the metadata.
-  const DiskIoMgr::ScanRange* metadata_range_;
+  const io::ScanRange* metadata_range_;
 
   /// Pool to copy dictionary page buffer into. This pool is shared across all 
the
   /// pages in a column chunk.
@@ -585,7 +585,7 @@ class HdfsParquetScanner : public HdfsScanner {
 
   /// Find and return the last split in the file if it is assigned to this 
scan node.
   /// Returns NULL otherwise.
-  static DiskIoMgr::ScanRange* FindFooterSplit(HdfsFileDesc* file);
+  static io::ScanRange* FindFooterSplit(HdfsFileDesc* file);
 
   /// Process the file footer and parse file_metadata_.  This should be called 
with the
   /// last FOOTER_SIZE bytes in context_.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 9149097..62dbd6a 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -32,11 +32,12 @@
 #include "codegen/llvm-codegen.h"
 #include "common/logging.h"
 #include "common/object-pool.h"
-#include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
-#include "runtime/disk-io-mgr-reader-context.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/request-context.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "util/disk-info.h"
@@ -54,6 +55,7 @@ DECLARE_bool(skip_file_runtime_filtering);
 
 namespace filesystem = boost::filesystem;
 using namespace impala;
+using namespace impala::io;
 using namespace strings;
 
 const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC =
@@ -236,7 +238,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
     file_desc->splits.push_back(
         AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), 
split.length,
             split.offset, split.partition_id, params.volume_id, expected_local,
-            DiskIoMgr::BufferOpts(try_cache, file_desc->mtime)));
+            BufferOpts(try_cache, file_desc->mtime)));
   }
 
   // Update server wide metrics for number of scan ranges and ranges that have
@@ -485,10 +487,10 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(
   return true;
 }
 
-DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const 
char* file,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool 
expected_local,
-    const DiskIoMgr::BufferOpts& buffer_opts,
-    const DiskIoMgr::ScanRange* original_split) {
+    const BufferOpts& buffer_opts,
+    const ScanRange* original_split) {
   DCHECK_GE(disk_id, -1);
   // Require that the scan range is within [0, file_length). While this cannot 
be used
   // to guarantee safety (file_length metadata may be stale), it avoids 
different
@@ -502,21 +504,20 @@ DiskIoMgr::ScanRange* 
HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char*
 
   ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
         new ScanRangeMetadata(partition_id, original_split));
-  DiskIoMgr::ScanRange* range =
-      runtime_state_->obj_pool()->Add(new DiskIoMgr::ScanRange());
+  ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange);
   range->Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, 
metadata);
   return range;
 }
 
-DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const 
char* file,
+ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool 
try_cache,
-    bool expected_local, int mtime, const DiskIoMgr::ScanRange* 
original_split) {
+    bool expected_local, int mtime, const ScanRange* original_split) {
   return AllocateScanRange(fs, file, len, offset, partition_id, disk_id, 
expected_local,
-      DiskIoMgr::BufferOpts(try_cache, mtime), original_split);
+      BufferOpts(try_cache, mtime), original_split);
 }
 
 Status HdfsScanNodeBase::AddDiskIoRanges(
-    const vector<DiskIoMgr::ScanRange*>& ranges, int num_files_queued) {
+    const vector<ScanRange*>& ranges, int num_files_queued) {
   
RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), 
ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index e6b2154..923b50a 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -31,11 +31,11 @@
 #include "exec/filter-context.h"
 #include "exec/scan-node.h"
 #include "runtime/descriptors.h"
-#include "runtime/disk-io-mgr.h"
+#include "runtime/io/request-ranges.h"
 #include "util/avro-util.h"
+#include "util/container-util.h"
 #include "util/progress-updater.h"
 #include "util/spinlock.h"
-#include "util/container-util.h"
 
 namespace impala {
 
@@ -72,7 +72,7 @@ struct HdfsFileDesc {
   THdfsCompression::type file_compression;
 
   /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
-  std::vector<DiskIoMgr::ScanRange*> splits;
+  std::vector<io::ScanRange*> splits;
 };
 
 /// Struct for additional metadata for scan ranges. This contains the 
partition id
@@ -84,9 +84,9 @@ struct ScanRangeMetadata {
   /// For parquet scan ranges we initially create a request for the file 
footer for each
   /// split; we store a pointer to the actual split so that we can recover its 
information
   /// for the scanner to process.
-  const DiskIoMgr::ScanRange* original_split;
+  const io::ScanRange* original_split;
 
-  ScanRangeMetadata(int64_t partition_id, const DiskIoMgr::ScanRange* 
original_split)
+  ScanRangeMetadata(int64_t partition_id, const io::ScanRange* original_split)
       : partition_id(partition_id), original_split(original_split) { }
 };
 
@@ -154,7 +154,7 @@ class HdfsScanNodeBase : public ScanNode {
   const HdfsTableDescriptor* hdfs_table() const { return hdfs_table_; }
   const AvroSchemaElement& avro_schema() const { return *avro_schema_.get(); }
   int skip_header_line_count() const { return skip_header_line_count_; }
-  DiskIoRequestContext* reader_context() const { return reader_context_.get(); 
}
+  io::RequestContext* reader_context() const { return reader_context_.get(); }
   bool optimize_parquet_count_star() const { return 
optimize_parquet_count_star_; }
   int parquet_count_star_slot_offset() const { return 
parquet_count_star_slot_offset_; }
 
@@ -204,22 +204,22 @@ class HdfsScanNodeBase : public ScanNode {
   /// If not NULL, the 'original_split' pointer is stored for reference in the 
scan range
   /// metadata of the scan range that is to be allocated.
   /// This is thread safe.
-  DiskIoMgr::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t 
len,
+  io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
-      const DiskIoMgr::BufferOpts& buffer_opts,
-      const DiskIoMgr::ScanRange* original_split = NULL);
+      const io::BufferOpts& buffer_opts,
+      const io::ScanRange* original_split = NULL);
 
   /// Old API for compatibility with text scanners (e.g. LZO text scanner).
-  DiskIoMgr::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t 
len,
+  io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
-      bool expected_local, int mtime, const DiskIoMgr::ScanRange* 
original_split = NULL);
+      bool expected_local, int mtime, const io::ScanRange* original_split = 
NULL);
 
   /// Adds ranges to the io mgr queue. 'num_files_queued' indicates how many 
file's scan
   /// ranges have been added completely.  A file's scan ranges are added 
completely if no
   /// new scanner threads will be needed to process that file besides the 
additional
   /// threads needed to process those in 'ranges'.
   /// Can be overridden to add scan-node specific actions like starting 
scanner threads.
-  virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& 
ranges,
+  virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges,
       int num_files_queued) WARN_UNUSED_RESULT;
 
   /// Adds all splits for file_desc to the io mgr queue and indicates one file 
has
@@ -336,7 +336,7 @@ class HdfsScanNodeBase : public ScanNode {
   const int parquet_count_star_slot_offset_;
 
   /// RequestContext object to use with the disk-io-mgr for reads.
-  std::unique_ptr<DiskIoRequestContext> reader_context_;
+  std::unique_ptr<io::RequestContext> reader_context_;
 
   /// Descriptor for tuples this scan node constructs
   const TupleDescriptor* tuple_desc_ = nullptr;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 4ce12fe..3502b18 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -50,7 +50,7 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
 
  private:
   /// Current scan range and corresponding scanner.
-  DiskIoMgr::ScanRange* scan_range_;
+  io::ScanRange* scan_range_;
   boost::scoped_ptr<ScannerContext> scanner_ctx_;
   boost::scoped_ptr<HdfsScanner> scanner_;
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 78f2ffa..2d58c05 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -43,6 +43,7 @@ DECLARE_bool(skip_file_runtime_filtering);
 #endif
 
 using namespace impala;
+using namespace impala::io;
 
 // Amount of memory that we approximate a scanner thread will use not 
including IoBuffers.
 // The memory used does not vary considerably between file formats (just a 
couple of MBs).
@@ -251,7 +252,7 @@ void 
HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) {
   materialized_row_batches_->AddBatch(move(row_batch));
 }
 
-Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& 
ranges,
+Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
     int num_files_queued) {
   RETURN_IF_ERROR(
       runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges));
@@ -420,7 +421,7 @@ void HdfsScanNode::ScannerThread() {
     // to return if there's an error.
     ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused);
 
-    DiskIoMgr::ScanRange* scan_range;
+    ScanRange* scan_range;
     // Take a snapshot of num_unqueued_files_ before calling GetNextRange().
     // We don't want num_unqueued_files_ to go to zero between the return from
     // GetNextRange() and the check for when all ranges are complete.
@@ -480,7 +481,7 @@ exit:
 }
 
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
-    MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) {
+    MemPool* expr_results_pool, ScanRange* scan_range) {
   DCHECK(scan_range != NULL);
 
   ScanRangeMetadata* metadata = 
static_cast<ScanRangeMetadata*>(scan_range->meta_data());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 30435c2..a1c97cf 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -29,7 +29,7 @@
 
 #include "exec/filter-context.h"
 #include "exec/hdfs-scan-node-base.h"
-#include "runtime/disk-io-mgr.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "util/counting-barrier.h"
 #include "util/thread.h"
 
@@ -79,7 +79,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
   bool done() const { return done_; }
 
   /// Adds ranges to the io mgr queue and starts up new scanner threads if 
possible.
-  virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& 
ranges,
+  virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges,
       int num_files_queued) WARN_UNUSED_RESULT;
 
   /// Adds a materialized row batch for the scan node.  This is called from 
scanner
@@ -166,7 +166,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to 
filter rows
   /// in this split.
   Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
-      MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) 
WARN_UNUSED_RESULT;
+      MemPool* expr_results_pool, io::ScanRange* scan_range) 
WARN_UNUSED_RESULT;
 
   /// Returns true if there is enough memory (against the mem tracker limits) 
to
   /// have a scanner thread.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index d633734..487c6fc 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -40,6 +40,7 @@
 using boost::algorithm::ends_with;
 using boost::algorithm::to_lower;
 using namespace impala;
+using namespace impala::io;
 using namespace strings;
 
 const char* HdfsTextScanner::LLVM_CLASS_NAME = "class.impala::HdfsTextScanner";
@@ -74,7 +75,7 @@ HdfsTextScanner::~HdfsTextScanner() {
 
 Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
-  vector<DiskIoMgr::ScanRange*> compressed_text_scan_ranges;
+  vector<ScanRange*> compressed_text_scan_ranges;
   int compressed_text_files = 0;
   vector<HdfsFileDesc*> lzo_text_files;
   for (int i = 0; i < files.size(); ++i) {
@@ -95,7 +96,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* 
scan_node,
           // In order to decompress gzip-, snappy- and bzip2-compressed text 
files, we
           // need to read entire files. Only read a file if we're assigned the 
first split
           // to avoid reading multi-block files with multiple scanners.
-          DiskIoMgr::ScanRange* split = files[i]->splits[j];
+          ScanRange* split = files[i]->splits[j];
 
           // We only process the split that starts at offset 0.
           if (split->offset() != 0) {
@@ -114,10 +115,10 @@ Status 
HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
           DCHECK_GT(files[i]->file_length, 0);
           ScanRangeMetadata* metadata =
               static_cast<ScanRangeMetadata*>(split->meta_data());
-          DiskIoMgr::ScanRange* file_range = 
scan_node->AllocateScanRange(files[i]->fs,
+          ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
               files[i]->filename.c_str(), files[i]->file_length, 0,
               metadata->partition_id, split->disk_id(), 
split->expected_local(),
-              DiskIoMgr::BufferOpts(split->try_cache(), files[i]->mtime));
+              BufferOpts(split->try_cache(), files[i]->mtime));
           compressed_text_scan_ranges.push_back(file_range);
           
scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 77fac89..6d5e085 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -52,7 +52,7 @@ KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& 
tnode,
     // This value is built the same way as it assumes that the scan node runs 
co-located
     // with a Kudu tablet server and that the tablet server is using disks 
similarly as
     // a datanode would.
-    max_row_batches = 10 * (DiskInfo::num_disks() + 
DiskIoMgr::REMOTE_NUM_DISKS);
+    max_row_batches = 10 * (DiskInfo::num_disks() + 
io::DiskIoMgr::REMOTE_NUM_DISKS);
   }
   materialized_row_batches_.reset(new RowBatchQueue(max_row_batches));
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 8cb195d..d9de769 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -21,6 +21,7 @@
 
 #include "exec/hdfs-scan-node-base.h"
 #include "exec/hdfs-scan-node.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
@@ -32,6 +33,7 @@
 #include "common/names.h"
 
 using namespace impala;
+using namespace impala::io;
 using namespace strings;
 
 static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
@@ -43,7 +45,7 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
 static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
 
 ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* 
scan_node,
-    HdfsPartitionDescriptor* partition_desc, DiskIoMgr::ScanRange* scan_range,
+    HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range,
     const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool)
   : state_(state),
     scan_node_(scan_node),
@@ -75,7 +77,7 @@ ScannerContext::Stream::Stream(ScannerContext* parent)
     boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
 }
 
-ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) 
{
+ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) {
   std::unique_ptr<Stream> stream(new Stream(this));
   stream->scan_range_ = range;
   stream->file_desc_ = scan_node_->GetFileDesc(partition_desc_->id(), 
stream->filename());
@@ -105,7 +107,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool 
done) {
     scan_range_->Cancel(Status::CANCELLED);
   }
 
-  for (unique_ptr<DiskIoMgr::BufferDescriptor>& buffer : 
completed_io_buffers_) {
+  for (unique_ptr<BufferDescriptor>& buffer : completed_io_buffers_) {
     ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer));
   }
   parent_->num_completed_io_buffers_ -= completed_io_buffers_.size();
@@ -164,9 +166,9 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
       return Status::OK();
     }
     int64_t partition_id = parent_->partition_descriptor()->id();
-    DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange(
+    ScanRange* range = parent_->scan_node_->AllocateScanRange(
         scan_range_->fs(), filename(), read_past_buffer_size, offset, 
partition_id,
-        scan_range_->disk_id(), false, DiskIoMgr::BufferOpts::Uncached());
+        scan_range_->disk_id(), false, BufferOpts::Uncached());
     RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
         parent_->scan_node_->reader_context(), range, &io_buffer_));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 216209f..3ad6753 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -27,7 +27,7 @@
 #include "common/compiler-util.h"
 #include "common/status.h"
 #include "exec/filter-context.h"
-#include "runtime/disk-io-mgr.h"
+#include "runtime/io/request-ranges.h"
 
 namespace impala {
 
@@ -65,7 +65,7 @@ class ScannerContext {
   /// get pushed to) and the scan range to process.
   /// This context starts with 1 stream.
   ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
-      DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& 
filter_ctxs,
+      io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool);
 
   /// Destructor verifies that all stream objects have been released.
@@ -125,7 +125,7 @@ class ScannerContext {
     bool eof() const { return file_offset() == file_len_; }
 
     const char* filename() { return scan_range_->file(); }
-    const DiskIoMgr::ScanRange* scan_range() { return scan_range_; }
+    const io::ScanRange* scan_range() { return scan_range_; }
     const HdfsFileDesc* file_desc() { return file_desc_; }
 
     /// Returns the buffer's current offset in the file.
@@ -176,7 +176,7 @@ class ScannerContext {
    private:
     friend class ScannerContext;
     ScannerContext* parent_;
-    DiskIoMgr::ScanRange* scan_range_;
+    io::ScanRange* scan_range_;
     const HdfsFileDesc* file_desc_;
 
     /// Total number of bytes returned from GetBytes()
@@ -195,7 +195,7 @@ class ScannerContext {
     int64_t next_read_past_size_bytes_;
 
     /// The current io buffer. This starts as NULL before we've read any bytes.
-    std::unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer_;
+    std::unique_ptr<io::BufferDescriptor> io_buffer_;
 
     /// Next byte to read in io_buffer_
     uint8_t* io_buffer_pos_;
@@ -227,7 +227,7 @@ class ScannerContext {
     /// On the next GetBytes() call, these buffers are released (the caller by 
calling
     /// GetBytes() signals it is done with its previous bytes).  At this point 
the
     /// buffers are returned to the I/O manager.
-    std::deque<std::unique_ptr<DiskIoMgr::BufferDescriptor>> 
completed_io_buffers_;
+    std::deque<std::unique_ptr<io::BufferDescriptor>> completed_io_buffers_;
 
     Stream(ScannerContext* parent);
 
@@ -290,7 +290,7 @@ class ScannerContext {
 
   /// Add a stream to this ScannerContext for 'range'. Returns the added 
stream.
   /// The stream is created in the runtime state's object pool
-  Stream* AddStream(DiskIoMgr::ScanRange* range);
+  Stream* AddStream(io::ScanRange* range);
 
   /// Returns false if scan_node_ is multi-threaded and has been cancelled.
   /// Always returns false if the scan_node_ is not multi-threaded.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 41805af..0d4b61c 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -16,6 +16,7 @@
 # under the License.
 
 add_subdirectory(bufferpool)
+add_subdirectory(io)
 
 # where to put generated libraries
 set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
@@ -36,10 +37,6 @@ add_library(Runtime
   data-stream-sender.cc
   debug-options.cc
   descriptors.cc
-  disk-io-mgr.cc
-  disk-io-mgr-reader-context.cc
-  disk-io-mgr-scan-range.cc
-  disk-io-mgr-stress.cc
   exec-env.cc
   fragment-instance-state.cc
   hbase-table.cc
@@ -78,16 +75,11 @@ add_library(Runtime
 )
 add_dependencies(Runtime gen-deps)
 
-# This test runs forever so should not be part of 'make test'
-add_executable(disk-io-mgr-stress-test disk-io-mgr-stress-test.cc)
-target_link_libraries(disk-io-mgr-stress-test ${IMPALA_TEST_LINK_LIBS})
-
 ADD_BE_TEST(mem-pool-test)
 ADD_BE_TEST(free-pool-test)
 ADD_BE_TEST(string-buffer-test)
 ADD_BE_TEST(data-stream-test)
 ADD_BE_TEST(timestamp-test)
-ADD_BE_TEST(disk-io-mgr-test)
 ADD_BE_TEST(raw-value-test)
 ADD_BE_TEST(string-compare-test)
 ADD_BE_TEST(string-search-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-handle-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.h 
b/be/src/runtime/disk-io-mgr-handle-cache.h
deleted file mode 100644
index 4ba2342..0000000
--- a/be/src/runtime/disk-io-mgr-handle-cache.h
+++ /dev/null
@@ -1,196 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H
-
-#include <array>
-#include <list>
-#include <map>
-#include <memory>
-
-#include <boost/thread/mutex.hpp>
-
-#include "common/hdfs.h"
-#include "common/status.h"
-#include "util/aligned-new.h"
-#include "util/impalad-metrics.h"
-#include "util/spinlock.h"
-#include "util/thread.h"
-
-namespace impala {
-
-/// This class is a small wrapper around the hdfsFile handle and the file 
system
-/// instance which is needed to close the file handle. The handle incorporates
-/// the last modified time of the file when it was opened. This is used to 
distinguish
-/// between file handles for files that can be updated or overwritten.
-class HdfsFileHandle {
- public:
-
-  /// Constructor will open the file
-  HdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime);
-
-  /// Destructor will close the file handle
-  ~HdfsFileHandle();
-
-  hdfsFile file() const { return hdfs_file_;  }
-  int64_t mtime() const { return mtime_; }
-  bool ok() const { return hdfs_file_ != nullptr; }
-
- private:
-  hdfsFS fs_;
-  hdfsFile hdfs_file_;
-  int64_t mtime_;
-};
-
-/// The FileHandleCache is a data structure that owns HdfsFileHandles to share 
between
-/// threads. The HdfsFileHandles are hash partitioned across NUM_PARTITIONS 
partitions.
-/// Each partition operates independently with its own locks, reducing 
contention
-/// between concurrent threads. The `capacity` is split between the partitions 
and is
-/// enforced independently.
-///
-/// Threads check out a file handle for exclusive access and return it when 
finished.
-/// If the file handle is not already present in the cache or all file handles 
for this
-/// file are checked out, the file handle is constructed and added to the 
cache.
-/// The cache can contain multiple file handles for the same file. If a file 
handle
-/// is checked out, it cannot be evicted from the cache. In this case, a cache 
can
-/// exceed the specified capacity.
-///
-/// The file handle cache is currently not suitable for remote files that 
maintain a
-/// connection as part of the handle. Most remote systems have a limit on the 
number
-/// of concurrent connections, and file handles in the cache would be counted 
towards
-/// that limit.
-///
-/// If there is a file handle in the cache and the underlying file is deleted,
-/// the file handle might keep the file from being deleted at the OS level. 
This can
-/// take up disk space and impact correctness. To avoid this, the cache will 
evict any
-/// file handle that has been unused for longer than threshold specified by
-/// `unused_handle_timeout_secs`. Eviction is disabled when the threshold is 0.
-///
-/// TODO: The cache should also evict file handles more aggressively if the 
file handle's
-/// mtime is older than the file's current mtime.
-template <size_t NUM_PARTITIONS>
-class FileHandleCache {
- public:
-  /// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS
-  /// partitions. If the capacity does not split evenly, then the capacity is 
rounded
-  /// up. The cache will age out any file handle that is unused for
-  /// `unused_handle_timeout_secs` seconds. Age out is disabled if this is set 
to zero.
-  FileHandleCache(size_t capacity, uint64_t unused_handle_timeout_secs);
-
-  /// Destructor is only called for backend tests
-  ~FileHandleCache();
-
-  /// Starts up a thread that monitors the age of file handles and evicts any 
that
-  /// exceed the limit.
-  Status Init() WARN_UNUSED_RESULT;
-
-  /// Get a file handle from the cache for the specified filename (fname) and
-  /// last modification time (mtime). This will hash the filename to determine
-  /// which partition to use for this file handle.
-  ///
-  /// If 'require_new_handle' is false and the partition contains an available 
handle,
-  /// the handle is returned and cache_hit is set to true. Otherwise, the 
partition will
-  /// try to construct a file handle and add it to the partition. On success, 
the new
-  /// file handle will be returned with cache_hit set to false. On failure, 
nullptr will
-  /// be returned. In either case, the partition may evict a file handle to 
make room
-  /// for the new file handle.
-  ///
-  /// This obtains exclusive control over the returned file handle. It must be 
paired
-  /// with a call to ReleaseFileHandle to release exclusive control.
-  HdfsFileHandle* GetFileHandle(const hdfsFS& fs, std::string* fname, int64_t 
mtime,
-      bool require_new_handle, bool* cache_hit);
-
-  /// Release the exclusive hold on the specified file handle (which was 
obtained
-  /// by calling GetFileHandle). The cache may evict a file handle if the 
cache is
-  /// above capacity. If 'destroy_handle' is true, immediately remove this 
handle
-  /// from the cache.
-  void ReleaseFileHandle(std::string* fname, HdfsFileHandle* fh, bool 
destroy_handle);
-
- private:
-  struct FileHandleEntry;
-  typedef std::multimap<std::string, FileHandleEntry> MapType;
-
-  struct LruListEntry {
-    LruListEntry(typename MapType::iterator map_entry_in);
-    typename MapType::iterator map_entry;
-    uint64_t timestamp_seconds;
-  };
-  typedef std::list<LruListEntry> LruListType;
-
-  struct FileHandleEntry {
-    FileHandleEntry(HdfsFileHandle* fh_in, LruListType& lru_list)
-    : fh(fh_in), lru_entry(lru_list.end()) {}
-    std::unique_ptr<HdfsFileHandle> fh;
-
-    /// in_use is true for a file handle checked out via GetFileHandle() that 
has not
-    /// been returned via ReleaseFileHandle().
-    bool in_use = false;
-
-    /// Iterator to this element's location in the LRU list. This only points 
to a
-    /// valid location when in_use is true. For error-checking, this is set to
-    /// lru_list.end() when in_use is false.
-    typename LruListType::iterator lru_entry;
-  };
-
-  /// Each partition operates independently, and thus has its own cache, LRU 
list,
-  /// and corresponding lock. To avoid contention on the lock_ due to false 
sharing
-  /// the partitions are aligned to cache line boundaries.
-  struct FileHandleCachePartition : public CacheLineAligned {
-    /// Protects access to cache and lru_list.
-    SpinLock lock;
-
-    /// Multimap from the file name to the file handles for that file. The 
cache
-    /// can contain multiple file handles for the same file and some may have
-    /// different mtimes if the file is being modified. All file handles are 
always
-    /// owned by the cache.
-    MapType cache;
-
-    /// The LRU list only contains file handles that are not in use.
-    LruListType lru_list;
-
-    /// Maximum number of file handles in cache without evicting unused file 
handles.
-    /// It is not a strict limit, and can be exceeded if all file handles are 
in use.
-    size_t capacity;
-
-    /// Current number of file handles in the cache
-    size_t size;
-  };
-
-  /// Periodic check to evict unused file handles. Only executed by 
eviction_thread_.
-  void EvictHandlesLoop();
-  static const int64_t EVICT_HANDLES_PERIOD_MS = 1000;
-
-  /// If the partition is above its capacity, evict the oldest unused file 
handles to
-  /// enforce the capacity.
-  void EvictHandles(FileHandleCachePartition& p);
-
-  std::array<FileHandleCachePartition, NUM_PARTITIONS> cache_partitions_;
-
-  /// Maximum time before an unused file handle is aged out of the cache.
-  /// Aging out is disabled if this is set to 0.
-  uint64_t unused_handle_timeout_secs_;
-
-  /// Thread to check for unused file handles to evict. This thread will exit 
when
-  /// the shut_down_promise_ is set.
-  std::unique_ptr<Thread> eviction_thread_;
-  Promise<bool> shut_down_promise_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-handle-cache.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.inline.h 
b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
deleted file mode 100644
index 3068971..0000000
--- a/be/src/runtime/disk-io-mgr-handle-cache.inline.h
+++ /dev/null
@@ -1,231 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <tuple>
-
-#include "runtime/disk-io-mgr-handle-cache.h"
-#include "util/hash-util.h"
-#include "util/time.h"
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
-
-namespace impala {
-
-HdfsFileHandle::HdfsFileHandle(const hdfsFS& fs, const char* fname,
-    int64_t mtime)
-    : fs_(fs), hdfs_file_(hdfsOpenFile(fs, fname, O_RDONLY, 0, 0, 0)), 
mtime_(mtime) {
-  ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(1L);
-  VLOG_FILE << "hdfsOpenFile() file=" << fname << " fid=" << hdfs_file_;
-}
-
-HdfsFileHandle::~HdfsFileHandle() {
-  if (hdfs_file_ != nullptr && fs_ != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L);
-    VLOG_FILE << "hdfsCloseFile() fid=" << hdfs_file_;
-    hdfsCloseFile(fs_, hdfs_file_);
-  }
-  fs_ = nullptr;
-  hdfs_file_ = nullptr;
-}
-
-template <size_t NUM_PARTITIONS>
-  FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity,
-      uint64_t unused_handle_timeout_secs)
-  : unused_handle_timeout_secs_(unused_handle_timeout_secs) {
-  DCHECK_GT(NUM_PARTITIONS, 0);
-  size_t remainder = capacity % NUM_PARTITIONS;
-  size_t base_capacity = capacity / NUM_PARTITIONS;
-  size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : 
base_capacity);
-  for (FileHandleCachePartition& p : cache_partitions_) {
-    p.size = 0;
-    p.capacity = partition_capacity;
-  }
-}
-
-template <size_t NUM_PARTITIONS>
-FileHandleCache<NUM_PARTITIONS>::LruListEntry::LruListEntry(
-    typename MapType::iterator map_entry_in)
-     : map_entry(map_entry_in), timestamp_seconds(MonotonicSeconds()) {}
-
-template <size_t NUM_PARTITIONS>
-FileHandleCache<NUM_PARTITIONS>::~FileHandleCache() {
-  shut_down_promise_.Set(true);
-  if (eviction_thread_ != nullptr) eviction_thread_->Join();
-}
-
-template <size_t NUM_PARTITIONS>
-Status FileHandleCache<NUM_PARTITIONS>::Init() {
-  return Thread::Create("disk-io-mgr-handle-cache", "File Handle Timeout",
-      &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this, 
&eviction_thread_);
-}
-
-template <size_t NUM_PARTITIONS>
-HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
-    const hdfsFS& fs, std::string* fname, int64_t mtime, bool 
require_new_handle,
-    bool* cache_hit) {
-  // Hash the key and get appropriate partition
-  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS;
-  FileHandleCachePartition& p = cache_partitions_[index];
-  boost::lock_guard<SpinLock> g(p.lock);
-  pair<typename MapType::iterator, typename MapType::iterator> range =
-    p.cache.equal_range(*fname);
-
-  // If this requires a new handle, skip to the creation codepath. Otherwise,
-  // find an unused entry with the same mtime
-  FileHandleEntry* ret_elem = nullptr;
-  if (!require_new_handle) {
-    while (range.first != range.second) {
-      FileHandleEntry* elem = &range.first->second;
-      if (!elem->in_use && elem->fh->mtime() == mtime) {
-        // This element is currently in the lru_list, which means that 
lru_entry must
-        // be an iterator pointing into the lru_list.
-        DCHECK(elem->lru_entry != p.lru_list.end());
-        // Remove the element from the lru_list and designate that it is not on
-        // the lru_list by resetting its iterator to point to the end of the 
list.
-        p.lru_list.erase(elem->lru_entry);
-        elem->lru_entry = p.lru_list.end();
-        ret_elem = elem;
-        *cache_hit = true;
-        break;
-      }
-      ++range.first;
-    }
-  }
-
-  // There was no entry that was free or caller asked for a new handle
-  if (!ret_elem) {
-    *cache_hit = false;
-    // Create a new entry and move it into the map
-    HdfsFileHandle* new_fh = new HdfsFileHandle(fs, fname->data(), mtime);
-    if (!new_fh->ok()) {
-      delete new_fh;
-      return nullptr;
-    }
-    FileHandleEntry entry(new_fh, p.lru_list);
-    typename MapType::iterator new_it = p.cache.emplace_hint(range.second,
-        *fname, std::move(entry));
-    ret_elem = &new_it->second;
-    ++p.size;
-    if (p.size > p.capacity) EvictHandles(p);
-  }
-
-  DCHECK(ret_elem->fh.get() != nullptr);
-  DCHECK(!ret_elem->in_use);
-  ret_elem->in_use = true;
-  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
-  return ret_elem->fh.get();
-}
-
-template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
-    HdfsFileHandle* fh, bool destroy_handle) {
-  DCHECK(fh != nullptr);
-  // Hash the key and get appropriate partition
-  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS;
-  FileHandleCachePartition& p = cache_partitions_[index];
-  boost::lock_guard<SpinLock> g(p.lock);
-  pair<typename MapType::iterator, typename MapType::iterator> range =
-    p.cache.equal_range(*fname);
-
-  // TODO: This can be optimized by maintaining some state in the file handle 
about
-  // its location in the map.
-  typename MapType::iterator release_it = range.first;
-  while (release_it != range.second) {
-    FileHandleEntry* elem = &release_it->second;
-    if (elem->fh.get() == fh) break;
-    ++release_it;
-  }
-  DCHECK(release_it != range.second);
-
-  // This file handle is no longer referenced
-  FileHandleEntry* release_elem = &release_it->second;
-  DCHECK(release_elem->in_use);
-  release_elem->in_use = false;
-  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
-  if (destroy_handle) {
-    --p.size;
-    p.cache.erase(release_it);
-    return;
-  }
-  // Hdfs can use some memory for readahead buffering. Calling unbuffer reduces
-  // this buffering so that the file handle takes up less memory when in the 
cache.
-  // If unbuffering is not supported, then hdfsUnbufferFile() will return a 
non-zero
-  // return code, and we close the file handle and remove it from the cache.
-  if (hdfsUnbufferFile(release_elem->fh->file()) == 0) {
-    // This FileHandleEntry must not be in the lru list already, because it was
-    // in use. Verify this by checking that the lru_entry is pointing to the 
end,
-    // which cannot be true for any element in the lru list.
-    DCHECK(release_elem->lru_entry == p.lru_list.end());
-    // Add this to the lru list, establishing links in both directions.
-    // The FileHandleEntry has an iterator to the LruListEntry and the
-    // LruListEntry has an iterator to the location of the FileHandleEntry in
-    // the cache.
-    release_elem->lru_entry = p.lru_list.emplace(p.lru_list.end(), release_it);
-    if (p.size > p.capacity) EvictHandles(p);
-  } else {
-    VLOG_FILE << "FS does not support file handle unbuffering, closing file="
-              << fname;
-    --p.size;
-    p.cache.erase(release_it);
-  }
-}
-
-template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop() {
-  while (true) {
-    for (FileHandleCachePartition& p : cache_partitions_) {
-      boost::lock_guard<SpinLock> g(p.lock);
-      EvictHandles(p);
-    }
-    // This Get() will time out until shutdown, when the promise is set.
-    bool timed_out;
-    shut_down_promise_.Get(EVICT_HANDLES_PERIOD_MS, &timed_out);
-    if (!timed_out) break;
-  }
-  // The promise must be set to true.
-  DCHECK(shut_down_promise_.IsSet());
-  DCHECK(shut_down_promise_.Get());
-}
-
-template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::EvictHandles(
-    FileHandleCache<NUM_PARTITIONS>::FileHandleCachePartition& p) {
-  uint64_t now = MonotonicSeconds();
-  uint64_t oldest_allowed_timestamp =
-      now > unused_handle_timeout_secs_ ? now - unused_handle_timeout_secs_ : 
0;
-  while (p.lru_list.size() > 0) {
-    // Peek at the oldest element
-    LruListEntry oldest_entry = p.lru_list.front();
-    typename MapType::iterator oldest_entry_map_it = oldest_entry.map_entry;
-    uint64_t oldest_entry_timestamp = oldest_entry.timestamp_seconds;
-    // If the oldest element does not need to be aged out and the cache is not 
over
-    // capacity, then we are done and there is nothing to evict.
-    if (p.size <= p.capacity && (unused_handle_timeout_secs_ == 0 ||
-        oldest_entry_timestamp >= oldest_allowed_timestamp)) {
-      return;
-    }
-    // Evict the oldest element
-    DCHECK(!oldest_entry_map_it->second.in_use);
-    p.cache.erase(oldest_entry_map_it);
-    p.lru_list.pop_front();
-    --p.size;
-  }
-}
-
-}
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h 
b/be/src/runtime/disk-io-mgr-internal.h
deleted file mode 100644
index cc50af7..0000000
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
-
-#include <unistd.h>
-#include <queue>
-#include <boost/thread/locks.hpp>
-#include <gutil/strings/substitute.h>
-
-#include "common/logging.h"
-#include "runtime/disk-io-mgr-reader-context.h"
-#include "runtime/disk-io-mgr.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/thread-resource-mgr.h"
-#include "util/condition-variable.h"
-#include "util/cpu-info.h"
-#include "util/debug-util.h"
-#include "util/disk-info.h"
-#include "util/filesystem-util.h"
-#include "util/hdfs-util.h"
-#include "util/impalad-metrics.h"
-
-/// This file contains internal structures shared between submodules of the 
IoMgr. Users
-/// of the IoMgr do not need to include this file.
-namespace impala {
-
-/// Per disk state
-struct DiskIoMgr::DiskQueue {
-  /// Disk id (0-based)
-  int disk_id;
-
-  /// Lock that protects access to 'request_contexts' and 'work_available'
-  boost::mutex lock;
-
-  /// Condition variable to signal the disk threads that there is work to do 
or the
-  /// thread should shut down.  A disk thread will be woken up when there is a 
reader
-  /// added to the queue. A reader is only on the queue when it has at least 
one
-  /// scan range that is not blocked on available buffers.
-  ConditionVariable work_available;
-
-  /// list of all request contexts that have work queued on this disk
-  std::list<DiskIoRequestContext*> request_contexts;
-
-  /// Enqueue the request context to the disk queue.  The DiskQueue lock must 
not be taken.
-  inline void EnqueueContext(DiskIoRequestContext* worker) {
-    {
-      boost::unique_lock<boost::mutex> disk_lock(lock);
-      /// Check that the reader is not already on the queue
-      DCHECK(find(request_contexts.begin(), request_contexts.end(), worker) ==
-          request_contexts.end());
-      request_contexts.push_back(worker);
-    }
-    work_available.NotifyAll();
-  }
-
-  DiskQueue(int id) : disk_id(id) {}
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc 
b/be/src/runtime/disk-io-mgr-reader-context.cc
deleted file mode 100644
index d62545b..0000000
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ /dev/null
@@ -1,292 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/disk-io-mgr-internal.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-void DiskIoRequestContext::Cancel(const Status& status) {
-  DCHECK(!status.ok());
-
-  // Callbacks are collected in this vector and invoked while no lock is held.
-  vector<WriteRange::WriteDoneCallback> write_callbacks;
-  {
-    lock_guard<mutex> lock(lock_);
-    DCHECK(Validate()) << endl << DebugString();
-
-    // Already being cancelled
-    if (state_ == DiskIoRequestContext::Cancelled) return;
-
-    DCHECK(status_.ok());
-    status_ = status;
-
-    // The reader will be put into a cancelled state until call cleanup is 
complete.
-    state_ = DiskIoRequestContext::Cancelled;
-
-    // Cancel all scan ranges for this reader. Each range could be one one of
-    // four queues.
-    for (int i = 0; i < disk_states_.size(); ++i) {
-      DiskIoRequestContext::PerDiskState& state = disk_states_[i];
-      RequestRange* range = NULL;
-      while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
-        if (range->request_type() == RequestType::READ) {
-          static_cast<ScanRange*>(range)->Cancel(status);
-        } else {
-          DCHECK(range->request_type() == RequestType::WRITE);
-          
write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
-        }
-      }
-
-      ScanRange* scan_range;
-      while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) {
-        scan_range->Cancel(status);
-      }
-      WriteRange* write_range;
-      while ((write_range = state.unstarted_write_ranges()->Dequeue()) != 
NULL) {
-        write_callbacks.push_back(write_range->callback_);
-      }
-    }
-
-    ScanRange* range = NULL;
-    while ((range = ready_to_start_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
-    }
-    while ((range = blocked_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
-    }
-    while ((range = cached_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
-    }
-
-    // Schedule reader on all disks. The disks will notice it is cancelled and 
do any
-    // required cleanup
-    for (int i = 0; i < disk_states_.size(); ++i) {
-      DiskIoRequestContext::PerDiskState& state = disk_states_[i];
-      state.ScheduleContext(this, i);
-    }
-  }
-
-  for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) {
-    write_callback(status_);
-  }
-
-  // Signal reader and unblock the GetNext/Read thread.  That read will fail 
with
-  // a cancelled status.
-  ready_to_start_ranges_cv_.NotifyAll();
-}
-
-void DiskIoRequestContext::CancelAndMarkInactive() {
-  Cancel(Status::CANCELLED);
-
-  boost::unique_lock<boost::mutex> l(lock_);
-  DCHECK_NE(state_, Inactive);
-  DCHECK(Validate()) << endl << DebugString();
-
-  // Wait until the ranges finish up.
-  while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l);
-
-  // Validate that no buffers were leaked from this context.
-  DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString();
-  DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString();
-  DCHECK(Validate()) << endl << DebugString();
-  state_ = Inactive;
-}
-
-void DiskIoRequestContext::AddRequestRange(
-    DiskIoMgr::RequestRange* range, bool schedule_immediately) {
-  // DCHECK(lock_.is_locked()); // TODO: boost should have this API
-  DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
-  if (state.done()) {
-    DCHECK_EQ(state.num_remaining_ranges(), 0);
-    state.set_done(false);
-    ++num_disks_with_ranges_;
-  }
-
-  bool schedule_context;
-  if (range->request_type() == RequestType::READ) {
-    DiskIoMgr::ScanRange* scan_range = 
static_cast<DiskIoMgr::ScanRange*>(range);
-    if (schedule_immediately) {
-      ScheduleScanRange(scan_range);
-    } else {
-      state.unstarted_scan_ranges()->Enqueue(scan_range);
-      num_unstarted_scan_ranges_.Add(1);
-    }
-    // If next_scan_range_to_start is NULL, schedule this DiskIoRequestContext 
so that it will
-    // be set. If it's not NULL, this context will be scheduled when 
GetNextRange() is
-    // invoked.
-    schedule_context = state.next_scan_range_to_start() == NULL;
-  } else {
-    DCHECK(range->request_type() == RequestType::WRITE);
-    DCHECK(!schedule_immediately);
-    DiskIoMgr::WriteRange* write_range = 
static_cast<DiskIoMgr::WriteRange*>(range);
-    state.unstarted_write_ranges()->Enqueue(write_range);
-
-    // ScheduleContext() has no effect if the context is already scheduled,
-    // so this is safe.
-    schedule_context = true;
-  }
-
-  if (schedule_context) state.ScheduleContext(this, range->disk_id());
-  ++state.num_remaining_ranges();
-}
-
-DiskIoRequestContext::DiskIoRequestContext(
-    DiskIoMgr* parent, int num_disks, MemTracker* tracker)
-  : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {}
-
-// Dumps out request context information. Lock should be taken by caller
-string DiskIoRequestContext::DebugString() const {
-  stringstream ss;
-  ss << endl << "  DiskIoRequestContext: " << (void*)this << " (state=";
-  if (state_ == DiskIoRequestContext::Inactive) ss << "Inactive";
-  if (state_ == DiskIoRequestContext::Cancelled) ss << "Cancelled";
-  if (state_ == DiskIoRequestContext::Active) ss << "Active";
-  if (state_ != DiskIoRequestContext::Inactive) {
-    ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
-       << " #ready_buffers=" << num_ready_buffers_.Load()
-       << " #used_buffers=" << num_used_buffers_.Load()
-       << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
-       << " #finished_scan_ranges=" << num_finished_ranges_.Load()
-       << " #disk_with_ranges=" << num_disks_with_ranges_
-       << " #disks=" << num_disks_with_ranges_;
-    for (int i = 0; i < disk_states_.size(); ++i) {
-      ss << endl << "   " << i << ": "
-         << "is_on_queue=" << disk_states_[i].is_on_queue()
-         << " done=" << disk_states_[i].done()
-         << " #num_remaining_scan_ranges=" << 
disk_states_[i].num_remaining_ranges()
-         << " #in_flight_ranges=" << disk_states_[i].in_flight_ranges()->size()
-         << " #unstarted_scan_ranges=" << 
disk_states_[i].unstarted_scan_ranges()->size()
-         << " #unstarted_write_ranges="
-         << disk_states_[i].unstarted_write_ranges()->size()
-         << " #reading_threads=" << disk_states_[i].num_threads_in_op();
-    }
-  }
-  ss << ")";
-  return ss.str();
-}
-
-bool DiskIoRequestContext::Validate() const {
-  if (state_ == DiskIoRequestContext::Inactive) {
-    LOG(WARNING) << "state_ == DiskIoRequestContext::Inactive";
-    return false;
-  }
-
-  if (num_used_buffers_.Load() < 0) {
-    LOG(WARNING) << "num_used_buffers_ < 0: #used=" << 
num_used_buffers_.Load();
-    return false;
-  }
-
-  if (num_ready_buffers_.Load() < 0) {
-    LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << 
num_ready_buffers_.Load();
-    return false;
-  }
-
-  int total_unstarted_ranges = 0;
-  for (int i = 0; i < disk_states_.size(); ++i) {
-    const PerDiskState& state = disk_states_[i];
-    bool on_queue = state.is_on_queue();
-    int num_reading_threads = state.num_threads_in_op();
-
-    total_unstarted_ranges += state.unstarted_scan_ranges()->size();
-
-    if (num_reading_threads < 0) {
-      LOG(WARNING) << "disk_id=" << i
-                   << "state.num_threads_in_read < 0: #threads="
-                   << num_reading_threads;
-      return false;
-    }
-
-    if (state_ != DiskIoRequestContext::Cancelled) {
-      if (state.unstarted_scan_ranges()->size() + 
state.in_flight_ranges()->size() >
-          state.num_remaining_ranges()) {
-        LOG(WARNING) << "disk_id=" << i
-                     << " state.unstarted_ranges.size() + 
state.in_flight_ranges.size()"
-                     << " > state.num_remaining_ranges:"
-                     << " #unscheduled=" << 
state.unstarted_scan_ranges()->size()
-                     << " #in_flight=" << state.in_flight_ranges()->size()
-                     << " #remaining=" << state.num_remaining_ranges();
-        return false;
-      }
-
-      // If we have an in_flight range, the reader must be on the queue or 
have a
-      // thread actively reading for it.
-      if (!state.in_flight_ranges()->empty() && !on_queue && 
num_reading_threads == 0) {
-        LOG(WARNING) << "disk_id=" << i
-                     << " reader has inflight ranges but is not on the disk 
queue."
-                     << " #in_flight_ranges=" << 
state.in_flight_ranges()->size()
-                     << " #reading_threads=" << num_reading_threads
-                     << " on_queue=" << on_queue;
-        return false;
-      }
-
-      if (state.done() && num_reading_threads > 0) {
-        LOG(WARNING) << "disk_id=" << i
-                     << " state set to done but there are still threads 
working."
-                     << " #reading_threads=" << num_reading_threads;
-        return false;
-      }
-    } else {
-      // Is Cancelled
-      if (!state.in_flight_ranges()->empty()) {
-        LOG(WARNING) << "disk_id=" << i
-                     << "Reader cancelled but has in flight ranges.";
-        return false;
-      }
-      if (!state.unstarted_scan_ranges()->empty()) {
-        LOG(WARNING) << "disk_id=" << i
-                     << "Reader cancelled but has unstarted ranges.";
-        return false;
-      }
-    }
-
-    if (state.done() && on_queue) {
-      LOG(WARNING) << "disk_id=" << i
-                   << " state set to done but the reader is still on the disk 
queue."
-                   << " state.done=true and state.is_on_queue=true";
-      return false;
-    }
-  }
-
-  if (state_ != DiskIoRequestContext::Cancelled) {
-    if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) {
-      LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges
-                   << " sum_in_states=" << num_unstarted_scan_ranges_.Load();
-      return false;
-    }
-  } else {
-    if (!ready_to_start_ranges_.empty()) {
-      LOG(WARNING) << "Reader cancelled but has ready to start ranges.";
-      return false;
-    }
-    if (!blocked_ranges_.empty()) {
-      LOG(WARNING) << "Reader cancelled but has blocked ranges.";
-      return false;
-    }
-  }
-
-  return true;
-}
-
-void DiskIoRequestContext::PerDiskState::ScheduleContext(
-    DiskIoRequestContext* context, int disk_id) {
-  if (!is_on_queue_ && !done_) {
-    is_on_queue_ = true;
-    context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
-  }
-}

Reply via email to