http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
new file mode 100644
index 0000000..f9aed92
--- /dev/null
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -0,0 +1,1191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/global-flags.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/disk-io-mgr-internal.h"
+#include "runtime/io/handle-cache.inline.h"
+
+#include <boost/algorithm/string.hpp>
+
+#include "gutil/strings/substitute.h"
+#include "util/bit-util.h"
+#include "util/hdfs-util.h"
+#include "util/time.h"
+
+DECLARE_bool(disable_mem_pools);
+#ifndef NDEBUG
+DECLARE_int32(stress_scratch_write_delay_ms);
+#endif
+
+#include "common/names.h"
+
+using namespace impala;
+using namespace impala::io;
+using namespace strings;
+
+// Control the number of disks on the machine.  If 0, this comes from the 
system
+// settings.
+DEFINE_int32(num_disks, 0, "Number of disks on data node.");
+// Default IoMgr configs:
+// The maximum number of the threads per disk is also the max queue depth per 
disk.
+DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk");
+
+// Rotational disks should have 1 thread per disk to minimize seeks.  
Non-rotational
+// don't have this penalty and benefit from multiple concurrent IO requests.
+static const int THREADS_PER_ROTATIONAL_DISK = 1;
+static const int THREADS_PER_SOLID_STATE_DISK = 8;
+
+// The maximum number of the threads per rotational disk is also the max queue 
depth per
+// rotational disk.
+static const string num_io_threads_per_rotational_disk_help_msg = 
Substitute("Number of "
+    "I/O threads per rotational disk. Has priority over num_threads_per_disk. 
If neither"
+    " is set, defaults to $0 thread(s) per rotational disk", 
THREADS_PER_ROTATIONAL_DISK);
+DEFINE_int32(num_io_threads_per_rotational_disk, 0,
+    num_io_threads_per_rotational_disk_help_msg.c_str());
+// The maximum number of the threads per solid state disk is also the max 
queue depth per
+// solid state disk.
+static const string num_io_threads_per_solid_state_disk_help_msg = 
Substitute("Number of"
+    " I/O threads per solid state disk. Has priority over 
num_threads_per_disk. If "
+    "neither is set, defaults to $0 thread(s) per solid state disk",
+    THREADS_PER_SOLID_STATE_DISK);
+DEFINE_int32(num_io_threads_per_solid_state_disk, 0,
+    num_io_threads_per_solid_state_disk_help_msg.c_str());
+// The maximum number of remote HDFS I/O threads.  HDFS access that are 
expected to be
+// remote are placed on a separate remote disk queue.  This is the queue depth 
for that
+// queue.  If 0, then the remote queue is not used and instead ranges are 
round-robined
+// across the local disk queues.
+DEFINE_int32(num_remote_hdfs_io_threads, 8, "Number of remote HDFS I/O 
threads");
+// The maximum number of S3 I/O threads. The default value of 16 was chosen 
emperically
+// to maximize S3 throughput. Maximum throughput is achieved with multiple 
connections
+// open to S3 and use of multiple CPU cores since S3 reads are relatively 
compute
+// expensive (SSL and JNI buffer overheads).
+DEFINE_int32(num_s3_io_threads, 16, "Number of S3 I/O threads");
+// The maximum number of ADLS I/O threads. This number is a good default to 
have for
+// clusters that may vary widely in size, due to an undocumented concurrency 
limit
+// enforced by ADLS for a cluster, which spans between 500-700. For smaller 
clusters
+// (~10 nodes), 64 threads would be more ideal.
+DEFINE_int32(num_adls_io_threads, 16, "Number of ADLS I/O threads");
+
+DECLARE_int64(min_buffer_size);
+
+// With 1024B through 8MB buffers, this is up to ~2GB of buffers.
+DEFINE_int32(max_free_io_buffers, 128,
+    "For each io buffer size, the maximum number of buffers the IoMgr will 
hold onto");
+
+// The number of cached file handles defines how much memory can be used per 
backend for
+// caching frequently used file handles. Measurements indicate that a single 
file handle
+// uses about 6kB of memory. 20k file handles will thus reserve ~120MB of 
memory.
+// The actual amount of memory that is associated with a file handle can be 
larger
+// or smaller, depending on the replication factor for this file or the path 
name.
+DEFINE_uint64(max_cached_file_handles, 20000, "Maximum number of HDFS file 
handles "
+    "that will be cached. Disabled if set to 0.");
+
+// The unused file handle timeout specifies how long a file handle will remain 
in the
+// cache if it is not being used. Aging out unused handles ensures that the 
cache is not
+// wasting memory on handles that aren't useful. This allows users to specify 
a larger
+// cache size, as the system will only use the memory on useful file handles.
+// Additionally, cached file handles keep an open file descriptor for local 
files.
+// If a file is deleted through HDFS, this open file descriptor can keep the 
disk space
+// from being freed. When the metadata sees that a file has been deleted, the 
file handle
+// will no longer be used by future queries. Aging out this file handle allows 
the
+// disk space to be freed in an appropriate period of time.
+DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in 
seconds, that an "
+    "unused HDFS file handle will remain in the file handle cache. Disabled if 
set "
+    "to 0.");
+
+// The IoMgr is able to run with a wide range of memory usage. If a query has 
memory
+// remaining less than this value, the IoMgr will stop all buffering 
regardless of the
+// current queue size.
+static const int LOW_MEMORY = 64 * 1024 * 1024;
+
+const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
+
+AtomicInt32 DiskIoMgr::next_disk_id_;
+
+namespace detail {
+// Indicates if file handle caching should be used
+static inline bool is_file_handle_caching_enabled() {
+  return FLAGS_max_cached_file_handles > 0;
+}
+}
+
+string DiskIoMgr::DebugString() {
+  stringstream ss;
+  ss << "Disks: " << endl;
+  for (int i = 0; i < disk_queues_.size(); ++i) {
+    unique_lock<mutex> lock(disk_queues_[i]->lock);
+    ss << "  " << (void*) disk_queues_[i] << ":" ;
+    if (!disk_queues_[i]->request_contexts.empty()) {
+      ss << " Readers: ";
+      for (RequestContext* req_context: disk_queues_[i]->request_contexts) {
+        ss << (void*)req_context;
+      }
+    }
+    ss << endl;
+  }
+  return ss.str();
+}
+
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
+    RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
+    int64_t buffer_len, MemTracker* mem_tracker)
+  : io_mgr_(io_mgr),
+    reader_(reader),
+    mem_tracker_(mem_tracker),
+    scan_range_(scan_range),
+    buffer_(buffer),
+    buffer_len_(buffer_len) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
+  DCHECK(buffer != nullptr);
+  DCHECK_GE(buffer_len, 0);
+  DCHECK_NE(scan_range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER,
+      mem_tracker == nullptr);
+}
+
+void BufferDescriptor::TransferOwnership(MemTracker* dst) {
+  DCHECK(dst != nullptr);
+  DCHECK(!is_client_buffer());
+  // Memory of cached buffers is not tracked against a tracker.
+  if (is_cached()) return;
+  DCHECK(mem_tracker_ != nullptr);
+  dst->Consume(buffer_len_);
+  mem_tracker_->Release(buffer_len_);
+  mem_tracker_ = dst;
+}
+
+WriteRange::WriteRange(
+    const string& file, int64_t file_offset, int disk_id, WriteDoneCallback 
callback)
+  : RequestRange(RequestType::WRITE), callback_(callback) {
+  SetRange(file, file_offset, disk_id);
+}
+
+void WriteRange::SetRange(
+    const std::string& file, int64_t file_offset, int disk_id) {
+  file_ = file;
+  offset_ = file_offset;
+  disk_id_ = disk_id;
+}
+
+void WriteRange::SetData(const uint8_t* buffer, int64_t len) {
+  data_ = buffer;
+  len_ = len;
+}
+
+static void CheckSseSupport() {
+  if (!CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
+    LOG(WARNING) << "This machine does not support sse4_2.  The default IO 
system "
+                    "configurations are suboptimal for this hardware.  
Consider "
+                    "increasing the number of threads per disk by restarting 
impalad "
+                    "using the --num_threads_per_disk flag with a higher 
value";
+  }
+}
+
+// Utility function to select flag that is set (has a positive value) based on 
precedence
+static inline int GetFirstPositiveVal(const int first_val, const int 
second_val,
+    const int default_val) {
+  return first_val > 0 ? first_val : (second_val > 0 ? second_val : 
default_val);
+}
+
+DiskIoMgr::DiskIoMgr() :
+    num_io_threads_per_rotational_disk_(GetFirstPositiveVal(
+        FLAGS_num_io_threads_per_rotational_disk, FLAGS_num_threads_per_disk,
+        THREADS_PER_ROTATIONAL_DISK)),
+    num_io_threads_per_solid_state_disk_(GetFirstPositiveVal(
+        FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk,
+        THREADS_PER_SOLID_STATE_DISK)),
+    max_buffer_size_(FLAGS_read_size),
+    min_buffer_size_(FLAGS_min_buffer_size),
+    shut_down_(false),
+    total_bytes_read_counter_(TUnit::BYTES),
+    read_timer_(TUnit::TIME_NS),
+    file_handle_cache_(min(FLAGS_max_cached_file_handles,
+        FileSystemUtil::MaxNumFileHandles()),
+        FLAGS_unused_file_handle_timeout_sec) {
+  DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size);
+  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, 
min_buffer_size_);
+  free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
+  int num_local_disks = DiskInfo::num_disks();
+  if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) {
+    LOG(WARNING) << "Number of disks specified should be between 0 and the 
number of "
+        "logical disks on the system. Defaulting to system setting of " <<
+        DiskInfo::num_disks() << " disks";
+  } else if (FLAGS_num_disks > 0) {
+    num_local_disks = FLAGS_num_disks;
+  }
+  disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
+  CheckSseSupport();
+}
+
+DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
+    int threads_per_solid_state_disk, int min_buffer_size, int 
max_buffer_size) :
+    num_io_threads_per_rotational_disk_(threads_per_rotational_disk),
+    num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk),
+    max_buffer_size_(max_buffer_size),
+    min_buffer_size_(min_buffer_size),
+    shut_down_(false),
+    total_bytes_read_counter_(TUnit::BYTES),
+    read_timer_(TUnit::TIME_NS),
+    file_handle_cache_(min(FLAGS_max_cached_file_handles,
+        FileSystemUtil::MaxNumFileHandles()),
+        FLAGS_unused_file_handle_timeout_sec) {
+  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, 
min_buffer_size_);
+  free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
+  if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
+  disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
+  CheckSseSupport();
+}
+
+DiskIoMgr::~DiskIoMgr() {
+  shut_down_ = true;
+  // Notify all worker threads and shut them down.
+  for (int i = 0; i < disk_queues_.size(); ++i) {
+    if (disk_queues_[i] == nullptr) continue;
+    {
+      // This lock is necessary to properly use the condition var to notify
+      // the disk worker threads.  The readers also grab this lock so updates
+      // to shut_down_ are protected.
+      unique_lock<mutex> disk_lock(disk_queues_[i]->lock);
+    }
+    disk_queues_[i]->work_available.NotifyAll();
+  }
+  disk_thread_group_.JoinAll();
+
+  for (int i = 0; i < disk_queues_.size(); ++i) {
+    if (disk_queues_[i] == nullptr) continue;
+    int disk_id = disk_queues_[i]->disk_id;
+    for (list<RequestContext*>::iterator it = 
disk_queues_[i]->request_contexts.begin();
+        it != disk_queues_[i]->request_contexts.end(); ++it) {
+      DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
+      DCHECK((*it)->disk_states_[disk_id].done());
+      (*it)->DecrementDiskRefCount();
+    }
+  }
+
+  DCHECK_EQ(num_buffers_in_readers_.Load(), 0);
+
+  // Delete all allocated buffers
+  int num_free_buffers = 0;
+  for (int idx = 0; idx < free_buffers_.size(); ++idx) {
+    num_free_buffers += free_buffers_[idx].size();
+  }
+  DCHECK_EQ(num_allocated_buffers_.Load(), num_free_buffers);
+  GcIoBuffers();
+
+  for (int i = 0; i < disk_queues_.size(); ++i) {
+    delete disk_queues_[i];
+  }
+
+  if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->Close();
+  if (cached_read_options_ != nullptr) 
hadoopRzOptionsFree(cached_read_options_);
+}
+
+Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
+  DCHECK(process_mem_tracker != nullptr);
+  free_buffer_mem_tracker_.reset(
+      new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
+
+  for (int i = 0; i < disk_queues_.size(); ++i) {
+    disk_queues_[i] = new DiskQueue(i);
+    int num_threads_per_disk;
+    if (i == RemoteDfsDiskId()) {
+      num_threads_per_disk = FLAGS_num_remote_hdfs_io_threads;
+    } else if (i == RemoteS3DiskId()) {
+      num_threads_per_disk = FLAGS_num_s3_io_threads;
+    } else if (i == RemoteAdlsDiskId()) {
+      num_threads_per_disk = FLAGS_num_adls_io_threads;
+    } else if (DiskInfo::is_rotational(i)) {
+      num_threads_per_disk = num_io_threads_per_rotational_disk_;
+    } else {
+      num_threads_per_disk = num_io_threads_per_solid_state_disk_;
+    }
+    for (int j = 0; j < num_threads_per_disk; ++j) {
+      stringstream ss;
+      ss << "work-loop(Disk: " << i << ", Thread: " << j << ")";
+      std::unique_ptr<Thread> t;
+      RETURN_IF_ERROR(Thread::Create("disk-io-mgr", ss.str(), 
&DiskIoMgr::WorkLoop,
+          this, disk_queues_[i], &t));
+      disk_thread_group_.AddThread(move(t));
+    }
+  }
+  RETURN_IF_ERROR(file_handle_cache_.Init());
+
+  cached_read_options_ = hadoopRzOptionsAlloc();
+  DCHECK(cached_read_options_ != nullptr);
+  // Disable checksumming for cached reads.
+  int ret = hadoopRzOptionsSetSkipChecksum(cached_read_options_, true);
+  DCHECK_EQ(ret, 0);
+  // Disable automatic fallback for cached reads.
+  ret = hadoopRzOptionsSetByteBufferPool(cached_read_options_, nullptr);
+  DCHECK_EQ(ret, 0);
+
+  return Status::OK();
+}
+
+unique_ptr<RequestContext> DiskIoMgr::RegisterContext(MemTracker* mem_tracker) 
{
+  return unique_ptr<RequestContext>(
+      new RequestContext(this, num_total_disks(), mem_tracker));
+}
+
+void DiskIoMgr::UnregisterContext(RequestContext* reader) {
+  reader->CancelAndMarkInactive();
+}
+
+// Cancellation requires coordination from multiple threads.  Each thread that 
currently
+// has a reference to the request context must notice the cancel and remove it 
from its
+// tracking structures.  The last thread to touch the context should 
deallocate (aka
+// recycle) the request context object.  Potential threads are:
+//  1. Disk threads that are currently reading for this reader.
+//  2. Caller threads that are waiting in GetNext.
+//
+// The steps are:
+// 1. Cancel will immediately set the context in the Cancelled state.  This 
prevents any
+// other thread from adding more ready buffers to the context (they all take a 
lock and
+// check the state before doing so), or any write ranges to the context.
+// 2. Cancel will call cancel on each ScanRange that is not yet complete, 
unblocking
+// any threads in GetNext(). The reader will see the cancelled Status 
returned. Cancel
+// also invokes the callback for the WriteRanges with the cancelled state.
+// 3. Disk threads notice the context is cancelled either when picking the 
next context
+// to process or when they try to enqueue a ready buffer.  Upon noticing the 
cancelled
+// state, removes the context from the disk queue.  The last thread per disk 
with an
+// outstanding reference to the context decrements the number of disk queues 
the context
+// is on.
+void DiskIoMgr::CancelContext(RequestContext* context) {
+  context->Cancel(Status::CANCELLED);
+}
+
+void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) {
+  r->read_timer_ = c;
+}
+
+void DiskIoMgr::set_bytes_read_counter(RequestContext* r, 
RuntimeProfile::Counter* c) {
+  r->bytes_read_counter_ = c;
+}
+
+void DiskIoMgr::set_active_read_thread_counter(RequestContext* r,
+    RuntimeProfile::Counter* c) {
+  r->active_read_thread_counter_ = c;
+}
+
+void DiskIoMgr::set_disks_access_bitmap(RequestContext* r,
+    RuntimeProfile::Counter* c) {
+  r->disks_accessed_bitmap_ = c;
+}
+
+int64_t DiskIoMgr::queue_size(RequestContext* reader) const {
+  return reader->num_ready_buffers_.Load();
+}
+
+Status DiskIoMgr::context_status(RequestContext* context) const {
+  unique_lock<mutex> lock(context->lock_);
+  return context->status_;
+}
+
+int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const {
+  return reader->bytes_read_local_.Load();
+}
+
+int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const {
+  return reader->bytes_read_short_circuit_.Load();
+}
+
+int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const {
+  return reader->bytes_read_dn_cache_.Load();
+}
+
+int DiskIoMgr::num_remote_ranges(RequestContext* reader) const {
+  return reader->num_remote_ranges_.Load();
+}
+
+int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const {
+  return reader->unexpected_remote_bytes_.Load();
+}
+
+int DiskIoMgr::cached_file_handles_hit_count(RequestContext* reader) const {
+  return reader->cached_file_handles_hit_count_.Load();
+}
+
+int DiskIoMgr::cached_file_handles_miss_count(RequestContext* reader) const {
+  return reader->cached_file_handles_miss_count_.Load();
+}
+
+int64_t DiskIoMgr::GetReadThroughput() {
+  return RuntimeProfile::UnitsPerSecond(&total_bytes_read_counter_, 
&read_timer_);
+}
+
+Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
+  int disk_id = range->disk_id_;
+  if (disk_id < 0 || disk_id >= disk_queues_.size()) {
+    return Status(TErrorCode::DISK_IO_ERROR,
+        Substitute("Invalid scan range.  Bad disk id: $0", disk_id));
+  }
+  if (range->offset_ < 0) {
+    return Status(TErrorCode::DISK_IO_ERROR,
+        Substitute("Invalid scan range. Negative offset $0", range->offset_));
+  }
+  if (range->len_ < 0) {
+    return Status(TErrorCode::DISK_IO_ERROR,
+        Substitute("Invalid scan range. Negative length $0", range->len_));
+  }
+  return Status::OK();
+}
+
+Status DiskIoMgr::AddScanRanges(RequestContext* reader,
+    const vector<ScanRange*>& ranges, bool schedule_immediately) {
+  if (ranges.empty()) return Status::OK();
+
+  // Validate and initialize all ranges
+  for (int i = 0; i < ranges.size(); ++i) {
+    RETURN_IF_ERROR(ValidateScanRange(ranges[i]));
+    ranges[i]->InitInternal(this, reader);
+  }
+
+  // disks that this reader needs to be scheduled on.
+  unique_lock<mutex> reader_lock(reader->lock_);
+  DCHECK(reader->Validate()) << endl << reader->DebugString();
+
+  if (reader->state_ == RequestContext::Cancelled) {
+    DCHECK(!reader->status_.ok());
+    return reader->status_;
+  }
+
+  // Add each range to the queue of the disk the range is on
+  for (int i = 0; i < ranges.size(); ++i) {
+    // Don't add empty ranges.
+    DCHECK_NE(ranges[i]->len(), 0);
+    ScanRange* range = ranges[i];
+
+    if (range->try_cache_) {
+      if (schedule_immediately) {
+        bool cached_read_succeeded;
+        RETURN_IF_ERROR(range->ReadFromCache(reader_lock, 
&cached_read_succeeded));
+        if (cached_read_succeeded) continue;
+        // Cached read failed, fall back to AddRequestRange() below.
+      } else {
+        reader->cached_ranges_.Enqueue(range);
+        continue;
+      }
+    }
+    reader->AddRequestRange(range, schedule_immediately);
+  }
+  DCHECK(reader->Validate()) << endl << reader->DebugString();
+
+  return Status::OK();
+}
+
+Status DiskIoMgr::AddScanRange(
+    RequestContext* reader, ScanRange* range, bool schedule_immediately) {
+  return AddScanRanges(reader, vector<ScanRange*>({range}), 
schedule_immediately);
+}
+
+// This function returns the next scan range the reader should work on, 
checking
+// for eos and error cases. If there isn't already a cached scan range or a 
scan
+// range prepared by the disk threads, the caller waits on the disk threads.
+Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
+  DCHECK(reader != nullptr);
+  DCHECK(range != nullptr);
+  *range = nullptr;
+  Status status = Status::OK();
+
+  unique_lock<mutex> reader_lock(reader->lock_);
+  DCHECK(reader->Validate()) << endl << reader->DebugString();
+
+  while (true) {
+    if (reader->state_ == RequestContext::Cancelled) {
+      DCHECK(!reader->status_.ok());
+      status = reader->status_;
+      break;
+    }
+
+    if (reader->num_unstarted_scan_ranges_.Load() == 0 &&
+        reader->ready_to_start_ranges_.empty() && 
reader->cached_ranges_.empty()) {
+      // All ranges are done, just return.
+      break;
+    }
+
+    if (!reader->cached_ranges_.empty()) {
+      // We have a cached range.
+      *range = reader->cached_ranges_.Dequeue();
+      DCHECK((*range)->try_cache_);
+      bool cached_read_succeeded;
+      RETURN_IF_ERROR((*range)->ReadFromCache(reader_lock, 
&cached_read_succeeded));
+      if (cached_read_succeeded) return Status::OK();
+
+      // This range ended up not being cached. Loop again and pick up a new 
range.
+      reader->AddRequestRange(*range, false);
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+      *range = nullptr;
+      continue;
+    }
+
+    if (reader->ready_to_start_ranges_.empty()) {
+      reader->ready_to_start_ranges_cv_.Wait(reader_lock);
+    } else {
+      *range = reader->ready_to_start_ranges_.Dequeue();
+      DCHECK(*range != nullptr);
+      int disk_id = (*range)->disk_id();
+      DCHECK_EQ(*range, 
reader->disk_states_[disk_id].next_scan_range_to_start());
+      // Set this to nullptr, the next time this disk runs for this reader, it 
will
+      // get another range ready.
+      reader->disk_states_[disk_id].set_next_scan_range_to_start(nullptr);
+      reader->ScheduleScanRange(*range);
+      break;
+    }
+  }
+  return status;
+}
+
+Status DiskIoMgr::Read(RequestContext* reader,
+    ScanRange* range, std::unique_ptr<BufferDescriptor>* buffer) {
+  DCHECK(range != nullptr);
+  DCHECK(buffer != nullptr);
+  *buffer = nullptr;
+
+  if (range->len() > max_buffer_size_
+      && range->external_buffer_tag_ != 
ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
+    return Status(TErrorCode::DISK_IO_ERROR, Substitute("Internal error: 
cannot "
+        "perform sync read of '$0' bytes that is larger than the max read 
buffer size "
+        "'$1'.", range->len(), max_buffer_size_));
+  }
+
+  vector<ScanRange*> ranges;
+  ranges.push_back(range);
+  RETURN_IF_ERROR(AddScanRanges(reader, ranges, true));
+  RETURN_IF_ERROR(range->GetNext(buffer));
+  DCHECK((*buffer) != nullptr);
+  DCHECK((*buffer)->eosr());
+  return Status::OK();
+}
+
+void DiskIoMgr::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
+  DCHECK(buffer_desc != nullptr);
+  if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == nullptr);
+
+  RequestContext* reader = buffer_desc->reader_;
+  if (buffer_desc->buffer_ != nullptr) {
+    if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) {
+      // Buffers the were not allocated by DiskIoMgr don't need to be freed.
+      FreeBufferMemory(buffer_desc.get());
+    }
+    buffer_desc->buffer_ = nullptr;
+    num_buffers_in_readers_.Add(-1);
+    reader->num_buffers_in_reader_.Add(-1);
+  } else {
+    // A nullptr buffer means there was an error in which case there is no 
buffer
+    // to return.
+  }
+
+  if (buffer_desc->eosr_ || buffer_desc->scan_range_->is_cancelled_) {
+    // Need to close the scan range if returning the last buffer or the scan 
range
+    // has been cancelled (and the caller might never get the last buffer).
+    // Close() is idempotent so multiple cancelled buffers is okay.
+    buffer_desc->scan_range_->Close();
+  }
+}
+
+unique_ptr<BufferDescriptor> DiskIoMgr::GetFreeBuffer(
+    RequestContext* reader, ScanRange* range, int64_t buffer_size) {
+  DCHECK_LE(buffer_size, max_buffer_size_);
+  DCHECK_GT(buffer_size, 0);
+  buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size);
+  int idx = free_buffers_idx(buffer_size);
+  // Quantize buffer size to nearest power of 2 greater than the specified 
buffer size and
+  // convert to bytes
+  buffer_size = (1LL << idx) * min_buffer_size_;
+
+  // Track memory against the reader. This is checked the next time we start
+  // a read for the next reader in DiskIoMgr::GetNextScanRange().
+  DCHECK(reader->mem_tracker_ != nullptr);
+  reader->mem_tracker_->Consume(buffer_size);
+
+  uint8_t* buffer = nullptr;
+  {
+    unique_lock<mutex> lock(free_buffers_lock_);
+    if (free_buffers_[idx].empty()) {
+      num_allocated_buffers_.Add(1);
+      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
+        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
+      }
+      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
+        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size);
+      }
+      // We already tracked this memory against the reader's MemTracker.
+      buffer = new uint8_t[buffer_size];
+    } else {
+      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
+        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
+      }
+      buffer = free_buffers_[idx].front();
+      free_buffers_[idx].pop_front();
+      free_buffer_mem_tracker_->Release(buffer_size);
+    }
+  }
+
+  // Validate more invariants.
+  DCHECK(range != nullptr);
+  DCHECK(reader != nullptr);
+  DCHECK(buffer != nullptr);
+  return unique_ptr<BufferDescriptor>(new BufferDescriptor(
+      this, reader, range, buffer, buffer_size, reader->mem_tracker_));
+}
+
+void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
+  unique_lock<mutex> lock(free_buffers_lock_);
+  int buffers_freed = 0;
+  int bytes_freed = 0;
+  // Free small-to-large to avoid retaining many small buffers and fragmenting 
memory.
+  for (int idx = 0; idx < free_buffers_.size(); ++idx) {
+    deque<uint8_t*>* free_buffers = &free_buffers_[idx];
+    while (
+        !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= 
bytes_to_free)) {
+      uint8_t* buffer = free_buffers->front();
+      free_buffers->pop_front();
+      int64_t buffer_size = (1LL << idx) * min_buffer_size_;
+      delete[] buffer;
+      free_buffer_mem_tracker_->Release(buffer_size);
+      num_allocated_buffers_.Add(-1);
+
+      ++buffers_freed;
+      bytes_freed += buffer_size;
+    }
+    if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break;
+  }
+
+  if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
+    ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-buffers_freed);
+  }
+  if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
+    ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed);
+  }
+  if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
+    ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed);
+  }
+}
+
+void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
+  DCHECK(!desc->is_cached());
+  DCHECK(!desc->is_client_buffer());
+  uint8_t* buffer = desc->buffer_;
+  int64_t buffer_size = desc->buffer_len_;
+  int idx = free_buffers_idx(buffer_size);
+  DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0)
+      << "buffer_size_ / min_buffer_size_ should be power of 2, got 
buffer_size = "
+      << buffer_size << ", min_buffer_size_ = " << min_buffer_size_;
+
+  {
+    unique_lock<mutex> lock(free_buffers_lock_);
+    if (!FLAGS_disable_mem_pools &&
+        free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
+      free_buffers_[idx].push_back(buffer);
+      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
+        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);
+      }
+      // This consume call needs to be protected by 'free_buffers_lock_' to 
avoid a race
+      // with a Release() call for the same buffer that could make consumption 
negative.
+      // Note: we can't use TryConsume(), which can indirectly call 
GcIoBuffers().
+      // TODO: after IMPALA-3200 is completed, we should be able to leverage 
the buffer
+      // pool's free lists, and remove these free lists.
+      free_buffer_mem_tracker_->Consume(buffer_size);
+    } else {
+      num_allocated_buffers_.Add(-1);
+      delete[] buffer;
+      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
+        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
+      }
+      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
+        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
+      }
+    }
+  }
+
+  // We transferred the buffer ownership from the BufferDescriptor to the 
DiskIoMgr.
+  desc->mem_tracker_->Release(buffer_size);
+  desc->buffer_ = nullptr;
+}
+
+// This function gets the next RequestRange to work on for this disk. It 
checks for
+// cancellation and
+// a) Updates ready_to_start_ranges if there are no scan ranges queued for 
this disk.
+// b) Adds an unstarted write range to in_flight_ranges_. The write range is 
processed
+//    immediately if there are no preceding scan ranges in in_flight_ranges_
+// It blocks until work is available or the thread is shut down.
+// Work is available if there is a RequestContext with
+//  - A ScanRange with a buffer available, or
+//  - A WriteRange in unstarted_write_ranges_.
+bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** 
range,
+    RequestContext** request_context) {
+  int disk_id = disk_queue->disk_id;
+  *range = nullptr;
+
+  // This loops returns either with work to do or when the disk IoMgr shuts 
down.
+  while (true) {
+    *request_context = nullptr;
+    RequestContext::PerDiskState* request_disk_state = nullptr;
+    {
+      unique_lock<mutex> disk_lock(disk_queue->lock);
+
+      while (!shut_down_ && disk_queue->request_contexts.empty()) {
+        // wait if there are no readers on the queue
+        disk_queue->work_available.Wait(disk_lock);
+      }
+      if (shut_down_) break;
+      DCHECK(!disk_queue->request_contexts.empty());
+
+      // Get the next reader and remove the reader so that another disk thread
+      // can't pick it up.  It will be enqueued before issuing the read to HDFS
+      // so this is not a big deal (i.e. multiple disk threads can read for the
+      // same reader).
+      // TODO: revisit.
+      *request_context = disk_queue->request_contexts.front();
+      disk_queue->request_contexts.pop_front();
+      DCHECK(*request_context != nullptr);
+      request_disk_state = &((*request_context)->disk_states_[disk_id]);
+      request_disk_state->IncrementRequestThreadAndDequeue();
+    }
+
+    // NOTE: no locks were taken in between.  We need to be careful about what 
state
+    // could have changed to the reader and disk in between.
+    // There are some invariants here.  Only one disk thread can have the
+    // same reader here (the reader is removed from the queue).  There can be
+    // other disk threads operating on this reader in other functions though.
+
+    // We just picked a reader. Before we may allocate a buffer on its behalf, 
check that
+    // it has not exceeded any memory limits (e.g. the query or process limit).
+    // TODO: once IMPALA-3200 is fixed, we should be able to remove the free 
lists and
+    // move these memory limit checks to GetFreeBuffer().
+    // Note that calling AnyLimitExceeded() can result in a call to 
GcIoBuffers().
+    // TODO: IMPALA-3209: we should not force a reader over its memory limit by
+    // pushing more buffers to it. Most readers can make progress and operate 
within
+    // a fixed memory limit.
+    if ((*request_context)->mem_tracker_ != nullptr
+        && (*request_context)->mem_tracker_->AnyLimitExceeded()) {
+      (*request_context)->Cancel(Status::MemLimitExceeded());
+    }
+
+    unique_lock<mutex> request_lock((*request_context)->lock_);
+    VLOG_FILE << "Disk (id=" << disk_id << ") reading for "
+        << (*request_context)->DebugString();
+
+    // Check if reader has been cancelled
+    if ((*request_context)->state_ == RequestContext::Cancelled) {
+      request_disk_state->DecrementRequestThreadAndCheckDone(*request_context);
+      continue;
+    }
+
+    DCHECK_EQ((*request_context)->state_, RequestContext::Active)
+        << (*request_context)->DebugString();
+
+    if (request_disk_state->next_scan_range_to_start() == nullptr &&
+        !request_disk_state->unstarted_scan_ranges()->empty()) {
+      // We don't have a range queued for this disk for what the caller should
+      // read next. Populate that.  We want to have one range waiting to 
minimize
+      // wait time in GetNextRange.
+      ScanRange* new_range = 
request_disk_state->unstarted_scan_ranges()->Dequeue();
+      (*request_context)->num_unstarted_scan_ranges_.Add(-1);
+      (*request_context)->ready_to_start_ranges_.Enqueue(new_range);
+      request_disk_state->set_next_scan_range_to_start(new_range);
+
+      if ((*request_context)->num_unstarted_scan_ranges_.Load() == 0) {
+        // All the ranges have been started, notify everyone blocked on 
GetNextRange.
+        // Only one of them will get work so make sure to return nullptr to 
the other
+        // caller threads.
+        (*request_context)->ready_to_start_ranges_cv_.NotifyAll();
+      } else {
+        (*request_context)->ready_to_start_ranges_cv_.NotifyOne();
+      }
+    }
+
+    // Always enqueue a WriteRange to be processed into in_flight_ranges_.
+    // This is done so in_flight_ranges_ does not exclusively contain 
ScanRanges.
+    // For now, enqueuing a WriteRange on each invocation of 
GetNextRequestRange()
+    // does not flood in_flight_ranges() with WriteRanges because the entire
+    // WriteRange is processed and removed from the queue after 
GetNextRequestRange()
+    // returns. (A DCHECK is used to ensure that writes do not exceed 8MB).
+    if (!request_disk_state->unstarted_write_ranges()->empty()) {
+      WriteRange* write_range = 
request_disk_state->unstarted_write_ranges()->Dequeue();
+      request_disk_state->in_flight_ranges()->Enqueue(write_range);
+    }
+
+    // Get the next scan range to work on from the reader. Only 
in_flight_ranges
+    // are eligible since the disk threads do not start new ranges on their 
own.
+
+    // There are no inflight ranges, nothing to do.
+    if (request_disk_state->in_flight_ranges()->empty()) {
+      request_disk_state->DecrementRequestThread();
+      continue;
+    }
+    DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
+    *range = request_disk_state->in_flight_ranges()->Dequeue();
+    DCHECK(*range != nullptr);
+
+    // Now that we've picked a request range, put the context back on the 
queue so
+    // another thread can pick up another request range for this context.
+    request_disk_state->ScheduleContext(*request_context, disk_id);
+    DCHECK((*request_context)->Validate()) << endl << 
(*request_context)->DebugString();
+    return true;
+  }
+
+  DCHECK(shut_down_);
+  return false;
+}
+
+void DiskIoMgr::HandleWriteFinished(
+    RequestContext* writer, WriteRange* write_range, const Status& 
write_status) {
+  // Copy disk_id before running callback: the callback may modify write_range.
+  int disk_id = write_range->disk_id_;
+
+  // Execute the callback before decrementing the thread count. Otherwise 
CancelContext()
+  // that waits for the disk ref count to be 0 will return, creating a race, 
e.g. see
+  // IMPALA-1890.
+  // The status of the write does not affect the status of the writer context.
+  write_range->callback_(write_status);
+  {
+    unique_lock<mutex> writer_lock(writer->lock_);
+    DCHECK(writer->Validate()) << endl << writer->DebugString();
+    RequestContext::PerDiskState& state = writer->disk_states_[disk_id];
+    if (writer->state_ == RequestContext::Cancelled) {
+      state.DecrementRequestThreadAndCheckDone(writer);
+    } else {
+      state.DecrementRequestThread();
+    }
+    --state.num_remaining_ranges();
+  }
+}
+
+void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* 
reader,
+    unique_ptr<BufferDescriptor> buffer) {
+  unique_lock<mutex> reader_lock(reader->lock_);
+
+  RequestContext::PerDiskState& state = 
reader->disk_states_[disk_queue->disk_id];
+  DCHECK(reader->Validate()) << endl << reader->DebugString();
+  DCHECK_GT(state.num_threads_in_op(), 0);
+  DCHECK(buffer->buffer_ != nullptr);
+
+  if (reader->state_ == RequestContext::Cancelled) {
+    state.DecrementRequestThreadAndCheckDone(reader);
+    DCHECK(reader->Validate()) << endl << reader->DebugString();
+    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
+    buffer->buffer_ = nullptr;
+    ScanRange* scan_range = buffer->scan_range_;
+    scan_range->Cancel(reader->status_);
+    // Enqueue the buffer to use the scan range's buffer cleanup path.
+    scan_range->EnqueueBuffer(reader_lock, move(buffer));
+    return;
+  }
+
+  DCHECK_EQ(reader->state_, RequestContext::Active);
+  DCHECK(buffer->buffer_ != nullptr);
+
+  // Update the reader's scan ranges.  There are a three cases here:
+  //  1. Read error
+  //  2. End of scan range
+  //  3. Middle of scan range
+  if (!buffer->status_.ok()) {
+    // Error case
+    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
+    buffer->buffer_ = nullptr;
+    buffer->eosr_ = true;
+    --state.num_remaining_ranges();
+    buffer->scan_range_->Cancel(buffer->status_);
+  } else if (buffer->eosr_) {
+    --state.num_remaining_ranges();
+  }
+
+  // After calling EnqueueBuffer(), it is no longer valid to read from buffer.
+  // Store the state we need before calling EnqueueBuffer().
+  bool eosr = buffer->eosr_;
+  ScanRange* scan_range = buffer->scan_range_;
+  bool is_cached = buffer->is_cached();
+  bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer));
+  if (eosr) {
+    // For cached buffers, we can't close the range until the cached buffer is 
returned.
+    // Close() is called from DiskIoMgr::ReturnBuffer().
+    if (!is_cached) scan_range->Close();
+  } else {
+    if (queue_full) {
+      reader->blocked_ranges_.Enqueue(scan_range);
+    } else {
+      reader->ScheduleScanRange(scan_range);
+    }
+  }
+  state.DecrementRequestThread();
+}
+
+void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
+  // The thread waits until there is work or the entire system is being shut 
down.
+  // If there is work, performs the read or write requested and re-enqueues the
+  // requesting context.
+  // Locks are not taken when reading from or writing to disk.
+  // The main loop has three parts:
+  //   1. GetNextRequestContext(): get the next request context (read or 
write) to
+  //      process and dequeue it.
+  //   2. For the dequeued request, gets the next scan- or write-range to 
process and
+  //      re-enqueues the request.
+  //   3. Perform the read or write as specified.
+  // Cancellation checking needs to happen in both steps 1 and 3.
+  while (true) {
+    RequestContext* worker_context = nullptr;;
+    RequestRange* range = nullptr;
+
+    if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
+      DCHECK(shut_down_);
+      break;
+    }
+
+    if (range->request_type() == RequestType::READ) {
+      ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range));
+    } else {
+      DCHECK(range->request_type() == RequestType::WRITE);
+      Write(worker_context, static_cast<WriteRange*>(range));
+    }
+  }
+
+  DCHECK(shut_down_);
+}
+
+// This function reads the specified scan range associated with the
+// specified reader context and disk queue.
+void DiskIoMgr::ReadRange(
+    DiskQueue* disk_queue, RequestContext* reader, ScanRange* range) {
+  int64_t bytes_remaining = range->len_ - range->bytes_read_;
+  DCHECK_GT(bytes_remaining, 0);
+  unique_ptr<BufferDescriptor> buffer_desc;
+  if (range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
+    buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, 
reader, range,
+        range->client_buffer_.data, range->client_buffer_.len, nullptr));
+  } else {
+    // Need to allocate a buffer to read into.
+    int64_t buffer_size = ::min(bytes_remaining, 
static_cast<int64_t>(max_buffer_size_));
+    buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, 
buffer_size);
+    if (buffer_desc == nullptr) return;
+  }
+  reader->num_used_buffers_.Add(1);
+
+  // No locks in this section.  Only working on local vars.  We don't want to 
hold a
+  // lock across the read call.
+  buffer_desc->status_ = range->Open(detail::is_file_handle_caching_enabled());
+  if (buffer_desc->status_.ok()) {
+    // Update counters.
+    if (reader->active_read_thread_counter_) {
+      reader->active_read_thread_counter_->Add(1L);
+    }
+    if (reader->disks_accessed_bitmap_) {
+      int64_t disk_bit = 1LL << disk_queue->disk_id;
+      reader->disks_accessed_bitmap_->BitOr(disk_bit);
+    }
+    SCOPED_TIMER(&read_timer_);
+    SCOPED_TIMER(reader->read_timer_);
+
+    buffer_desc->status_ = range->Read(buffer_desc->buffer_, 
buffer_desc->buffer_len_,
+        &buffer_desc->len_, &buffer_desc->eosr_);
+    buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
+
+    if (reader->bytes_read_counter_ != nullptr) {
+      COUNTER_ADD(reader->bytes_read_counter_, buffer_desc->len_);
+    }
+
+    COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_);
+    if (reader->active_read_thread_counter_) {
+      reader->active_read_thread_counter_->Add(-1L);
+    }
+  }
+
+  // Finished read, update reader/disk based on the results
+  HandleReadFinished(disk_queue, reader, move(buffer_desc));
+}
+
+unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange(
+    DiskQueue* disk_queue, RequestContext* reader, ScanRange* range,
+    int64_t buffer_size) {
+  DCHECK(reader->mem_tracker_ != nullptr);
+  bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
+  if (!enough_memory) {
+    // Low memory, GC all the buffers and try again.
+    GcIoBuffers();
+    enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
+  }
+
+  if (!enough_memory) {
+    RequestContext::PerDiskState& state = 
reader->disk_states_[disk_queue->disk_id];
+    unique_lock<mutex> reader_lock(reader->lock_);
+
+    // Just grabbed the reader lock, check for cancellation.
+    if (reader->state_ == RequestContext::Cancelled) {
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+      state.DecrementRequestThreadAndCheckDone(reader);
+      range->Cancel(reader->status_);
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+      return nullptr;
+    }
+
+    if (!range->ready_buffers_.empty()) {
+      // We have memory pressure and this range doesn't need another buffer
+      // (it already has one queued). Skip this range and pick it up later.
+      range->blocked_on_queue_ = true;
+      reader->blocked_ranges_.Enqueue(range);
+      state.DecrementRequestThread();
+      return nullptr;
+    } else {
+      // We need to get a buffer anyway since there are none queued. The query
+      // is likely to fail due to mem limits but there's nothing we can do 
about that
+      // now.
+    }
+  }
+  unique_ptr<BufferDescriptor> buffer_desc = GetFreeBuffer(reader, range, 
buffer_size);
+  DCHECK(buffer_desc != nullptr);
+  return buffer_desc;
+}
+
+void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) 
{
+  Status ret_status = Status::OK();
+  FILE* file_handle = nullptr;
+  // Raw open() syscall will create file if not present when passed these 
flags.
+  int fd = open(write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
+  if (fd < 0) {
+    ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
+        Substitute("Opening '$0' for write failed with errno=$1 
description=$2",
+                                     write_range->file_, errno, 
GetStrErrMsg())));
+  } else {
+    file_handle = fdopen(fd, "wb");
+    if (file_handle == nullptr) {
+      ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
+          Substitute("fdopen($0, \"wb\") failed with errno=$1 description=$2", 
fd, errno,
+                                       GetStrErrMsg())));
+    }
+  }
+
+  if (file_handle != nullptr) {
+    ret_status = WriteRangeHelper(file_handle, write_range);
+
+    int success = fclose(file_handle);
+    if (ret_status.ok() && success != 0) {
+      ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
+          Substitute("fclose($0) failed", write_range->file_)));
+    }
+  }
+
+  HandleWriteFinished(writer_context, write_range, ret_status);
+}
+
+Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) 
{
+  // Seek to the correct offset and perform the write.
+  int success = fseek(file_handle, write_range->offset(), SEEK_SET);
+  if (success != 0) {
+    return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
+        Substitute("fseek($0, $1, SEEK_SET) failed with errno=$2 
description=$3",
+        write_range->file_, write_range->offset(), errno, GetStrErrMsg())));
+  }
+
+#ifndef NDEBUG
+  if (FLAGS_stress_scratch_write_delay_ms > 0) {
+    SleepForMs(FLAGS_stress_scratch_write_delay_ms);
+  }
+#endif
+  int64_t bytes_written = fwrite(write_range->data_, 1, write_range->len_, 
file_handle);
+  if (bytes_written < write_range->len_) {
+    return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
+        Substitute("fwrite(buffer, 1, $0, $1) failed with errno=$2 
description=$3",
+        write_range->len_, write_range->file_, errno, GetStrErrMsg())));
+  }
+  if (ImpaladMetrics::IO_MGR_BYTES_WRITTEN != nullptr) {
+    ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
+  }
+
+  return Status::OK();
+}
+
+int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
+  int64_t buffer_size_scaled = BitUtil::Ceil(buffer_size, min_buffer_size_);
+  int idx = BitUtil::Log2Ceiling64(buffer_size_scaled);
+  DCHECK_GE(idx, 0);
+  DCHECK_LT(idx, free_buffers_.size());
+  return idx;
+}
+
+Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* 
write_range) {
+  unique_lock<mutex> writer_lock(writer->lock_);
+
+  if (writer->state_ == RequestContext::Cancelled) {
+    DCHECK(!writer->status_.ok());
+    return writer->status_;
+  }
+
+  writer->AddRequestRange(write_range, false);
+  return Status::OK();
+}
+
+int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) 
{
+  // If it's a remote range, check for an appropriate remote disk queue.
+  if (!expected_local) {
+    if (IsHdfsPath(file) && FLAGS_num_remote_hdfs_io_threads > 0) {
+      return RemoteDfsDiskId();
+    }
+    if (IsS3APath(file)) return RemoteS3DiskId();
+    if (IsADLSPath(file)) return RemoteAdlsDiskId();
+  }
+  // Assign to a local disk queue.
+  DCHECK(!IsS3APath(file)); // S3 is always remote.
+  DCHECK(!IsADLSPath(file)); // ADLS is always remote.
+  if (disk_id == -1) {
+    // disk id is unknown, assign it an arbitrary one.
+    disk_id = next_disk_id_.Add(1);
+  }
+  // TODO: we need to parse the config for the number of dirs configured for 
this
+  // data node.
+  return disk_id % num_local_disks();
+}
+
+HdfsFileHandle* DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs,
+    std::string* fname, int64_t mtime, RequestContext *reader,
+    bool require_new) {
+  bool cache_hit;
+  HdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, 
require_new,
+      &cache_hit);
+  if (fh == nullptr) return nullptr;
+  if (cache_hit) {
+    DCHECK(!require_new);
+    ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(1L);
+    ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT->Increment(1L);
+    reader->cached_file_handles_hit_count_.Add(1L);
+  } else {
+    ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(0L);
+    ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT->Increment(1L);
+    reader->cached_file_handles_miss_count_.Add(1L);
+  }
+  return fh;
+}
+
+void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname, 
HdfsFileHandle* fid,
+    bool destroy_handle) {
+  file_handle_cache_.ReleaseFileHandle(fname, fid, destroy_handle);
+}
+
+Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* 
fname,
+    int64_t mtime, HdfsFileHandle** fid) {
+  bool cache_hit;
+  file_handle_cache_.ReleaseFileHandle(fname, *fid, true);
+  // The old handle has been destroyed, so *fid must be overwritten before 
returning.
+  *fid = file_handle_cache_.GetFileHandle(fs, fname, mtime, true,
+      &cache_hit);
+  if (*fid == nullptr) {
+    return Status(TErrorCode::DISK_IO_ERROR,
+        GetHdfsErrorMsg("Failed to open HDFS file ", fname->data()));
+  }
+  DCHECK(!cache_hit);
+  return Status::OK();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
new file mode 100644
index 0000000..71dc840
--- /dev/null
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -0,0 +1,550 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_IO_DISK_IO_MGR_H
+#define IMPALA_RUNTIME_IO_DISK_IO_MGR_H
+
+#include <deque>
+#include <functional>
+#include <vector>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/unordered_set.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "common/atomic.h"
+#include "common/hdfs.h"
+#include "common/object-pool.h"
+#include "common/status.h"
+#include "runtime/io/handle-cache.h"
+#include "runtime/io/request-ranges.h"
+#include "runtime/thread-resource-mgr.h"
+#include "util/aligned-new.h"
+#include "util/bit-util.h"
+#include "util/condition-variable.h"
+#include "util/error-util.h"
+#include "util/runtime-profile.h"
+#include "util/thread.h"
+
+namespace impala {
+
+class MemTracker;
+
+namespace io {
+/// Manager object that schedules IO for all queries on all disks and remote 
filesystems
+/// (such as S3). Each query maps to one or more RequestContext objects, each 
of which
+/// has its own queue of scan ranges and/or write ranges.
+//
+/// The API splits up requesting scan/write ranges (non-blocking) and reading 
the data
+/// (blocking). The DiskIoMgr has worker threads that will read from and write 
to
+/// disk/hdfs/remote-filesystems, allowing interleaving of IO and CPU. This 
allows us to
+/// keep all disks and all cores as busy as possible.
+//
+/// All public APIs are thread-safe. It is not valid to call any of the APIs 
after
+/// UnregisterContext() returns.
+//
+/// For Readers:
+/// We can model this problem as a multiple producer (threads for each disk), 
multiple
+/// consumer (scan ranges) problem. There are multiple queues that need to be
+/// synchronized. Conceptually, there are two queues:
+///   1. The per disk queue: this contains a queue of readers that need reads.
+///   2. The per scan range ready-buffer queue: this contains buffers that 
have been
+///      read and are ready for the caller.
+/// The disk queue contains a queue of readers and is scheduled in a round 
robin fashion.
+/// Readers map to scan nodes. The reader then contains a queue of scan 
ranges. The caller
+/// asks the IoMgr for the next range to process. The IoMgr then selects the 
best range
+/// to read based on disk activity and begins reading and queuing buffers for 
that range.
+/// TODO: We should map readers to queries. A reader is the unit of scheduling 
and queries
+/// that have multiple scan nodes shouldn't have more 'turns'.
+//
+/// For Writers:
+/// Data is written via AddWriteRange(). This is non-blocking and adds a 
WriteRange to a
+/// per-disk queue. After the write is complete, a callback in WriteRange is 
invoked.
+/// No memory is allocated within IoMgr for writes and no copies are made. It 
is the
+/// responsibility of the client to ensure that the data to be written is 
valid and that
+/// the file to be written to exists until the callback is invoked.
+//
+/// The IoMgr provides three key APIs.
+///  1. AddScanRanges: this is non-blocking and tells the IoMgr all the ranges 
that
+///     will eventually need to be read.
+///  2. GetNextRange: returns to the caller the next scan range it should 
process.
+///     This is based on disk load. This also begins reading the data in this 
scan
+///     range. This is blocking.
+///  3. ScanRange::GetNext: returns the next buffer for this range.  This is 
blocking.
+//
+/// The disk threads do not synchronize with each other. The readers and 
writers don't
+/// synchronize with each other. There is a lock and condition variable for 
each request
+/// context queue and each disk queue.
+/// IMPORTANT: whenever both locks are needed, the lock order is to grab the 
context lock
+/// before the disk lock.
+//
+/// Scheduling: If there are multiple request contexts with work for a single 
disk, the
+/// request contexts are scheduled in round-robin order. Multiple disk threads 
can
+/// operate on the same request context. Exactly one request range is 
processed by a
+/// disk thread at a time. If there are multiple scan ranges scheduled via
+/// GetNextRange() for a single context, these are processed in round-robin 
order.
+/// If there are multiple scan and write ranges for a disk, a read is always 
followed
+/// by a write, and a write is followed by a read, i.e. reads and writes 
alternate.
+/// If multiple write ranges are enqueued for a single disk, they will be 
processed
+/// by the disk threads in order, but may complete in any order. No guarantees 
are made
+/// on ordering of writes across disks.
+//
+/// Resource Management: effective resource management in the IoMgr is key to 
good
+/// performance. The IoMgr helps coordinate two resources: CPU and disk. For 
CPU,
+/// spinning up too many threads causes thrashing.
+/// Memory usage in the IoMgr comes from queued read buffers.  If we queue the 
minimum
+/// (i.e. 1), then the disks are idle while we are processing the buffer. If 
we don't
+/// limit the queue, then it possible we end up queueing the entire data set 
(i.e. CPU
+/// is slower than disks) and run out of memory.
+/// For both CPU and memory, we want to model the machine as having a fixed 
amount of
+/// resources.  If a single query is running, it should saturate either CPU or 
Disk
+/// as well as using as little memory as possible. With multiple queries, each 
query
+/// should get less CPU. In that case each query will need fewer queued 
buffers and
+/// therefore have less memory usage.
+//
+/// The IoMgr defers CPU management to the caller. The IoMgr provides a 
GetNextRange
+/// API which will return the next scan range the caller should process. The 
caller
+/// can call this from the desired number of reading threads. Once a scan range
+/// has been returned via GetNextRange, the IoMgr will start to buffer reads 
for
+/// that range and it is expected the caller will pull those buffers promptly. 
For
+/// example, if the caller would like to have 1 scanner thread, the read loop
+/// would look like:
+///   while (more_ranges)
+///     range = GetNextRange()
+///     while (!range.eosr)
+///       buffer = range.GetNext()
+/// To have multiple reading threads, the caller would simply spin up the 
threads
+/// and each would process the loops above.
+//
+/// To control the number of IO buffers, each scan range has a limit of two 
queued
+/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at 
capacity,
+/// the IoMgr will no longer read for that scan range until the caller has 
processed
+/// a buffer. Assuming the client returns each buffer before requesting the 
next one
+/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O 
buffers per
+/// scan range.
+//
+/// Buffer Management:
+/// Buffers for reads are either a) allocated by the IoMgr and transferred to 
the caller,
+/// b) cached HDFS buffers if the scan range uses HDFS caching, or c) provided 
by the
+/// caller when constructing the scan range.
+///
+/// As a caller reads from a scan range, these buffers are wrapped in 
BufferDescriptors
+/// and returned to the caller. The caller must always call ReturnBuffer() on 
the buffer
+/// descriptor to allow recycling of the associated buffer (if there is an
+/// IoMgr-allocated or HDFS cached buffer).
+///
+/// Caching support:
+/// Scan ranges contain metadata on whether or not it is cached on the DN. In 
that
+/// case, we use the HDFS APIs to read the cached data without doing any 
copies. For these
+/// ranges, the reads happen on the caller thread (as opposed to the disk 
threads).
+/// It is possible for the cached read APIs to fail, in which case the ranges 
are then
+/// queued on the disk threads and behave identically to the case where the 
range
+/// is not cached.
+/// Resources for these ranges are also not accounted against the reader 
because none
+/// are consumed.
+/// While a cached block is being processed, the block is mlocked. We want to 
minimize
+/// the time the mlock is held.
+///   - HDFS will time us out if we hold onto the mlock for too long
+///   - Holding the lock prevents uncaching this file due to a caching policy 
change.
+/// Therefore, we only issue the cached read when the caller is ready to 
process the
+/// range (GetNextRange()) instead of when the ranges are issued. This 
guarantees that
+/// there will be a CPU available to process the buffer and any throttling we 
do with
+/// the number of scanner threads properly controls the amount of files we 
mlock.
+/// With cached scan ranges, we cannot close the scan range until the cached 
buffer
+/// is returned (HDFS does not allow this). We therefore need to defer the 
close until
+/// the cached buffer is returned (ReturnBuffer()).
+//
+/// Remote filesystem support (e.g. S3):
+/// Remote filesystems are modeled as "remote disks". That is, there is a 
seperate disk
+/// queue for each supported remote filesystem type. In order to maximize 
throughput,
+/// multiple connections are opened in parallel by having multiple threads 
running per
+/// queue. Also note that reading from a remote filesystem service can be more 
CPU
+/// intensive than local disk/hdfs because of non-direct I/O and SSL 
processing, and can
+/// be CPU bottlenecked especially if not enough I/O threads for these queues 
are
+/// started.
+//
+/// TODO: IoMgr should be able to request additional scan ranges from the 
coordinator
+/// to help deal with stragglers.
+/// TODO: look into using a lock free queue
+/// TODO: simplify the common path (less locking, memory allocations).
+//
+/// Structure of the Implementation:
+///  - All client APIs are defined in this file, request-ranges.h and 
request-context.h.
+///    Clients can include only the files that they need.
+///  - Some internal classes are defined in disk-io-mgr-internal.h
+///  - ScanRange APIs are implemented in scan-range.cc
+///    This contains the ready buffer queue logic
+///  - RequestContext APIs are implemented in request-context.cc
+///    This contains the logic for picking scan ranges for a reader.
+///  - Disk Thread and general APIs are implemented in disk-io-mgr.cc.
+///  - The handle cache is implemented in handle-cache{.inline,}.h
+
+// This is cache line aligned because the FileHandleCache needs cache line 
alignment
+// for its partitions.
+class DiskIoMgr : public CacheLineAligned {
+ public:
+  /// Create a DiskIoMgr object. This constructor is only used for testing.
+  ///  - num_disks: The number of disks the IoMgr should use. This is used for 
testing.
+  ///    Specify 0, to have the disk IoMgr query the os for the number of 
disks.
+  ///  - threads_per_rotational_disk: number of read threads to create per 
rotational
+  ///    disk. This is also the max queue depth.
+  ///  - threads_per_solid_state_disk: number of read threads to create per 
solid state
+  ///    disk. This is also the max queue depth.
+  ///  - min_buffer_size: minimum io buffer size (in bytes)
+  ///  - max_buffer_size: maximum io buffer size (in bytes). Also the max read 
size.
+  DiskIoMgr(int num_disks, int threads_per_rotational_disk,
+      int threads_per_solid_state_disk, int min_buffer_size, int 
max_buffer_size);
+
+  /// Create DiskIoMgr with default configs.
+  DiskIoMgr();
+
+  /// Clean up all threads and resources. This is mostly useful for testing 
since
+  /// for impalad, this object is never destroyed.
+  ~DiskIoMgr();
+
+  /// Initialize the IoMgr. Must be called once before any of the other APIs.
+  Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT;
+
+  /// Allocates tracking structure for a request context.
+  /// Register a new request context and return it to the caller. The caller 
must call
+  /// UnregisterContext() for each context.
+  /// reader_mem_tracker: Is non-null only for readers. IO buffers
+  ///    used for this reader will be tracked by this. If the limit is exceeded
+  ///    the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned 
via
+  ///    GetNext().
+  std::unique_ptr<RequestContext> RegisterContext(MemTracker* 
reader_mem_tracker);
+
+  /// Unregisters context from the disk IoMgr by first cancelling it then 
blocking until
+  /// all references to the context are removed from I/O manager internal data 
structures.
+  /// This must be called for every RegisterContext() to ensure that the 
context object
+  /// can be safely destroyed. It is invalid to add more request ranges to 
'context' after
+  /// after this call. This call blocks until all the disk threads have 
finished cleaning
+  /// up.
+  void UnregisterContext(RequestContext* context);
+
+  /// This function cancels the context asychronously. All outstanding requests
+  /// are aborted and tracking structures cleaned up. This does not need to be
+  /// called if the context finishes normally.
+  /// This will also fail any outstanding GetNext()/Read requests.
+  void CancelContext(RequestContext* context);
+
+  /// Adds the scan ranges to the queues. This call is non-blocking. The 
caller must
+  /// not deallocate the scan range pointers before UnregisterContext().
+  /// If schedule_immediately, the ranges are immediately put on the read queue
+  /// (i.e. the caller should not/cannot call GetNextRange for these ranges).
+  /// This can be used to do synchronous reads as well as schedule dependent 
ranges,
+  /// as in the case for columnar formats.
+  Status AddScanRanges(RequestContext* reader,
+      const std::vector<ScanRange*>& ranges,
+      bool schedule_immediately = false) WARN_UNUSED_RESULT;
+  Status AddScanRange(RequestContext* reader, ScanRange* range,
+      bool schedule_immediately = false) WARN_UNUSED_RESULT;
+
+  /// Add a WriteRange for the writer. This is non-blocking and schedules the 
context
+  /// on the IoMgr disk queue. Does not create any files.
+  Status AddWriteRange(
+      RequestContext* writer, WriteRange* write_range) WARN_UNUSED_RESULT;
+
+  /// Returns the next unstarted scan range for this reader. When the range is 
returned,
+  /// the disk threads in the IoMgr will already have started reading from it. 
The
+  /// caller is expected to call ScanRange::GetNext on the returned range.
+  /// If there are no more unstarted ranges, nullptr is returned.
+  /// This call is blocking.
+  Status GetNextRange(RequestContext* reader, ScanRange** range) 
WARN_UNUSED_RESULT;
+
+  /// Reads the range and returns the result in buffer.
+  /// This behaves like the typical synchronous read() api, blocking until the 
data
+  /// is read. This can be called while there are outstanding ScanRanges and is
+  /// thread safe. Multiple threads can be calling Read() per reader at a time.
+  /// range *cannot* have already been added via AddScanRanges.
+  /// This can only be used if the scan range fits in a single IO buffer (i.e. 
is smaller
+  /// than max_read_buffer_size()) or if reading into a client-provided buffer.
+  Status Read(RequestContext* reader, ScanRange* range,
+      std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
+
+  /// Returns the buffer to the IoMgr. This must be called for every buffer
+  /// returned by GetNext()/Read() that did not return an error. This is 
non-blocking.
+  /// After calling this, the buffer descriptor is invalid and cannot be 
accessed.
+  void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
+
+  /// Determine which disk queue this file should be assigned to.  Returns an 
index into
+  /// disk_queues_.  The disk_id is the volume ID for the local disk that 
holds the
+  /// files, or -1 if unknown.  Flag expected_local is true iff this impalad is
+  /// co-located with the datanode for this file.
+  int AssignQueue(const char* file, int disk_id, bool expected_local);
+
+  /// TODO: The functions below can be moved to RequestContext.
+  /// Returns the current status of the context.
+  Status context_status(RequestContext* context) const WARN_UNUSED_RESULT;
+
+  void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*);
+  void set_read_timer(RequestContext*, RuntimeProfile::Counter*);
+  void set_active_read_thread_counter(RequestContext*, 
RuntimeProfile::Counter*);
+  void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*);
+
+  int64_t queue_size(RequestContext* reader) const;
+  int64_t bytes_read_local(RequestContext* reader) const;
+  int64_t bytes_read_short_circuit(RequestContext* reader) const;
+  int64_t bytes_read_dn_cache(RequestContext* reader) const;
+  int num_remote_ranges(RequestContext* reader) const;
+  int64_t unexpected_remote_bytes(RequestContext* reader) const;
+  int cached_file_handles_hit_count(RequestContext* reader) const;
+  int cached_file_handles_miss_count(RequestContext* reader) const;
+
+  /// Returns the read throughput across all readers.
+  /// TODO: should this be a sliding window?  This should report metrics for 
the
+  /// last minute, hour and since the beginning.
+  int64_t GetReadThroughput();
+
+  /// Returns the maximum read buffer size
+  int max_read_buffer_size() const { return max_buffer_size_; }
+
+  /// Returns the total number of disk queues (both local and remote).
+  int num_total_disks() const { return disk_queues_.size(); }
+
+  /// Returns the total number of remote "disk" queues.
+  int num_remote_disks() const { return REMOTE_NUM_DISKS; }
+
+  /// Returns the number of local disks attached to the system.
+  int num_local_disks() const { return num_total_disks() - num_remote_disks(); 
}
+
+  /// The disk ID (and therefore disk_queues_ index) used for DFS accesses.
+  int RemoteDfsDiskId() const { return num_local_disks() + 
REMOTE_DFS_DISK_OFFSET; }
+
+  /// The disk ID (and therefore disk_queues_ index) used for S3 accesses.
+  int RemoteS3DiskId() const { return num_local_disks() + 
REMOTE_S3_DISK_OFFSET; }
+
+  /// The disk ID (and therefore disk_queues_ index) used for ADLS accesses.
+  int RemoteAdlsDiskId() const { return num_local_disks() + 
REMOTE_ADLS_DISK_OFFSET; }
+
+  /// Dumps the disk IoMgr queues (for readers and disks)
+  std::string DebugString();
+
+  /// Validates the internal state is consistent. This is intended to only be 
used
+  /// for debugging.
+  bool Validate() const;
+
+  /// Given a FS handle, name and last modified time of the file, gets an 
HdfsFileHandle
+  /// from the file handle cache. If 'require_new_handle' is true, the cache 
will open
+  /// a fresh file handle. On success, records statistics about whether this 
was
+  /// a cache hit or miss in the 'reader' as well as at the system level. In 
case of an
+  /// error returns nullptr.
+  HdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs,
+      std::string* fname, int64_t mtime, RequestContext *reader,
+      bool require_new_handle);
+
+  /// Releases a file handle back to the file handle cache when it is no 
longer in use.
+  /// If 'destroy_handle' is true, the file handle cache will close the file 
handle
+  /// immediately.
+  void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid,
+      bool destroy_handle);
+
+  /// Reopens a file handle by destroying the file handle and getting a fresh
+  /// file handle from the cache. Returns an error if the file could not be 
reopened.
+  Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, 
int64_t mtime,
+      HdfsFileHandle** fid);
+
+  /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the 
buffers if
+  /// 'bytes_to_free' is -1.
+  void GcIoBuffers(int64_t bytes_to_free = -1);
+
+  /// The maximum number of ready buffers that can be queued in a scan range. 
Having two
+  /// queued buffers (plus the buffer that is returned to the client) gives 
good
+  /// performance in most scenarios:
+  /// 1. If the consumer is consuming data faster than we can read from disk, 
then the
+  ///    queue will be empty most of the time because the buffer will be 
immediately
+  ///    pulled off the queue as soon as it is added. There will always be an 
I/O request
+  ///    in the disk queue to maximize I/O throughput, which is the bottleneck 
in this
+  ///    case.
+  /// 2. If we can read from disk faster than the consumer is consuming data, 
the queue
+  ///    will fill up and there will always be a buffer available for the 
consumer to
+  ///    read, so the consumer will not block and we maximize consumer 
throughput, which
+  ///    is the bottleneck in this case.
+  /// 3. If the consumer is consuming data at approximately the same rate as 
we are
+  ///    reading from disk, then the steady state is that the consumer is 
processing one
+  ///    buffer and one buffer is in the disk queue. The additional buffer can 
absorb
+  ///    bursts where the producer runs faster than the consumer or the 
consumer runs
+  ///    faster than the producer without blocking either the producer or 
consumer.
+  static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2;
+
+  /// "Disk" queue offsets for remote accesses.  Offset 0 corresponds to
+  /// disk ID (i.e. disk_queue_ index) of num_local_disks().
+  enum {
+    REMOTE_DFS_DISK_OFFSET = 0,
+    REMOTE_S3_DISK_OFFSET,
+    REMOTE_ADLS_DISK_OFFSET,
+    REMOTE_NUM_DISKS
+  };
+
+ private:
+  friend class BufferDescriptor;
+  friend class RequestContext;
+  // TODO: remove io:: prefix - it is required for the "using ScanRange" 
workaround above.
+  friend class io::ScanRange;
+  struct DiskQueue;
+
+  friend class DiskIoMgrTest_Buffers_Test;
+  friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
+
+  /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
+  boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
+
+  /// Memory tracker for I/O buffers where the RequestContext has no 
MemTracker.
+  /// TODO: once IMPALA-3200 is fixed, there should be no more cases where 
readers don't
+  /// provide a MemTracker.
+  boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
+
+  /// Number of worker(read) threads per rotational disk. Also the max depth 
of queued
+  /// work to the disk.
+  const int num_io_threads_per_rotational_disk_;
+
+  /// Number of worker(read) threads per solid state disk. Also the max depth 
of queued
+  /// work to the disk.
+  const int num_io_threads_per_solid_state_disk_;
+
+  /// Maximum read size. This is also the maximum size of each allocated 
buffer.
+  const int max_buffer_size_;
+
+  /// The minimum size of each read buffer.
+  const int min_buffer_size_;
+
+  /// Thread group containing all the worker threads.
+  ThreadGroup disk_thread_group_;
+
+  /// Options object for cached hdfs reads. Set on startup and never modified.
+  struct hadoopRzOptions* cached_read_options_ = nullptr;
+
+  /// True if the IoMgr should be torn down. Worker threads watch for this to
+  /// know to terminate. This variable is read/written to by different threads.
+  volatile bool shut_down_;
+
+  /// Total bytes read by the IoMgr.
+  RuntimeProfile::Counter total_bytes_read_counter_;
+
+  /// Total time spent in hdfs reading
+  RuntimeProfile::Counter read_timer_;
+
+  /// Protects free_buffers_
+  boost::mutex free_buffers_lock_;
+
+  /// Free buffers that can be handed out to clients. There is one list for 
each buffer
+  /// size, indexed by the Log2 of the buffer size in units of 
min_buffer_size_. The
+  /// maximum buffer size is max_buffer_size_, so the maximum index is
+  /// Log2(max_buffer_size_ / min_buffer_size_).
+  //
+  /// E.g. if min_buffer_size_ = 1024 bytes:
+  ///  free_buffers_[0]  => list of free buffers with size 1024 B
+  ///  free_buffers_[1]  => list of free buffers with size 2048 B
+  ///  free_buffers_[10] => list of free buffers with size 1 MB
+  ///  free_buffers_[13] => list of free buffers with size 8 MB
+  ///  free_buffers_[n]  => list of free buffers with size 2^n * 1024 B
+  std::vector<std::deque<uint8_t*>> free_buffers_;
+
+  /// Total number of allocated buffers, used for debugging.
+  AtomicInt32 num_allocated_buffers_;
+
+  /// Total number of buffers in readers
+  AtomicInt32 num_buffers_in_readers_;
+
+  /// Per disk queues. This is static and created once at Init() time.  One 
queue is
+  /// allocated for each local disk on the system and for each remote 
filesystem type.
+  /// It is indexed by disk id.
+  std::vector<DiskQueue*> disk_queues_;
+
+  /// The next disk queue to write to if the actual 'disk_id_' is unknown 
(i.e. the file
+  /// is not associated with a particular local disk or remote queue). Used to 
implement
+  /// round-robin assignment for that case.
+  static AtomicInt32 next_disk_id_;
+
+  // Number of file handle cache partitions to use
+  static const size_t NUM_FILE_HANDLE_CACHE_PARTITIONS = 16;
+
+  // Caching structure that maps file names to cached file handles. The cache 
has an upper
+  // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached 
file
+  // handles are closed.
+  FileHandleCache<NUM_FILE_HANDLE_CACHE_PARTITIONS> file_handle_cache_;
+
+  /// Returns the index into free_buffers_ for a given buffer size
+  int free_buffers_idx(int64_t buffer_size);
+
+  /// Returns a buffer to read into with size between 'buffer_size' and
+  /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the
+  /// 'free_buffers_', that is returned, otherwise a new one is allocated.
+  /// The returned *buffer_size must be between 0 and 'max_buffer_size_'.
+  /// The buffer memory is tracked against reader's mem tracker, or
+  /// 'unowned_buffer_mem_tracker_' if the reader does not have one.
+  std::unique_ptr<BufferDescriptor> GetFreeBuffer(
+      RequestContext* reader, ScanRange* range, int64_t buffer_size);
+
+  /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be 
nullptr), either
+  /// freeing it or returning it to 'free_buffers_'. Memory tracking is 
updated to
+  /// reflect the transfer of ownership from desc->mem_tracker_ to the disk 
I/O mgr.
+  void FreeBufferMemory(BufferDescriptor* desc);
+
+  /// Disk worker thread loop. This function retrieves the next range to 
process on
+  /// the disk queue and invokes ReadRange() or Write() depending on the type 
of Range().
+  /// There can be multiple threads per disk running this loop.
+  void WorkLoop(DiskQueue* queue);
+
+  /// This is called from the disk thread to get the next range to process. It 
will
+  /// wait until a scan range and buffer are available, or a write range is 
available.
+  /// This functions returns the range to process.
+  /// Only returns false if the disk thread should be shut down.
+  /// No locks should be taken before this function call and none are left 
taken after.
+  bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
+      RequestContext** request_context);
+
+  /// Updates disk queue and reader state after a read is complete. The read 
result
+  /// is captured in the buffer descriptor.
+  void HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
+      std::unique_ptr<BufferDescriptor> buffer);
+
+  /// Invokes write_range->callback_  after the range has been written and
+  /// updates per-disk state and handle state. The status of the write 
OK/RUNTIME_ERROR
+  /// etc. is passed via write_status and to the callback.
+  /// The write_status does not affect the writer->status_. That is, an write 
error does
+  /// not cancel the writer context - that decision is left to the callback 
handler.
+  /// TODO: On the read path, consider not canceling the reader context on 
error.
+  void HandleWriteFinished(
+      RequestContext* writer, WriteRange* write_range, const Status& 
write_status);
+
+  /// Validates that range is correctly initialized
+  Status ValidateScanRange(ScanRange* range) WARN_UNUSED_RESULT;
+
+  /// Write the specified range to disk and calls HandleWriteFinished when 
done.
+  /// Responsible for opening and closing the file that is written.
+  void Write(RequestContext* writer_context, WriteRange* write_range);
+
+  /// Helper method to write a range using the specified FILE handle. Returns 
Status:OK
+  /// if the write succeeded, or a RUNTIME_ERROR with an appropriate message 
otherwise.
+  /// Does not open or close the file that is written.
+  Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) 
WARN_UNUSED_RESULT;
+
+  /// Reads the specified scan range and calls HandleReadFinished when done.
+  void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* 
range);
+
+  /// Try to allocate the next buffer for the scan range, returning the new 
buffer
+  /// if successful. If 'reader' is cancelled, cancels the range and returns 
nullptr.
+  /// If there is memory pressure and buffers are already queued, adds the 
range
+  /// to the blocked ranges and returns nullptr.
+  std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* 
disk_queue,
+      RequestContext* reader, ScanRange* range, int64_t buffer_size);
+};
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/handle-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/handle-cache.h b/be/src/runtime/io/handle-cache.h
new file mode 100644
index 0000000..78f91cd
--- /dev/null
+++ b/be/src/runtime/io/handle-cache.h
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H
+#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H
+
+#include <array>
+#include <list>
+#include <map>
+#include <memory>
+
+#include <boost/thread/mutex.hpp>
+
+#include "common/hdfs.h"
+#include "common/status.h"
+#include "util/aligned-new.h"
+#include "util/impalad-metrics.h"
+#include "util/spinlock.h"
+#include "util/thread.h"
+
+namespace impala {
+namespace io {
+
+/// This class is a small wrapper around the hdfsFile handle and the file 
system
+/// instance which is needed to close the file handle. The handle incorporates
+/// the last modified time of the file when it was opened. This is used to 
distinguish
+/// between file handles for files that can be updated or overwritten.
+class HdfsFileHandle {
+ public:
+
+  /// Constructor will open the file
+  HdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime);
+
+  /// Destructor will close the file handle
+  ~HdfsFileHandle();
+
+  hdfsFile file() const { return hdfs_file_;  }
+  int64_t mtime() const { return mtime_; }
+  bool ok() const { return hdfs_file_ != nullptr; }
+
+ private:
+  hdfsFS fs_;
+  hdfsFile hdfs_file_;
+  int64_t mtime_;
+};
+
+/// The FileHandleCache is a data structure that owns HdfsFileHandles to share 
between
+/// threads. The HdfsFileHandles are hash partitioned across NUM_PARTITIONS 
partitions.
+/// Each partition operates independently with its own locks, reducing 
contention
+/// between concurrent threads. The `capacity` is split between the partitions 
and is
+/// enforced independently.
+///
+/// Threads check out a file handle for exclusive access and return it when 
finished.
+/// If the file handle is not already present in the cache or all file handles 
for this
+/// file are checked out, the file handle is constructed and added to the 
cache.
+/// The cache can contain multiple file handles for the same file. If a file 
handle
+/// is checked out, it cannot be evicted from the cache. In this case, a cache 
can
+/// exceed the specified capacity.
+///
+/// The file handle cache is currently not suitable for remote files that 
maintain a
+/// connection as part of the handle. Most remote systems have a limit on the 
number
+/// of concurrent connections, and file handles in the cache would be counted 
towards
+/// that limit.
+///
+/// If there is a file handle in the cache and the underlying file is deleted,
+/// the file handle might keep the file from being deleted at the OS level. 
This can
+/// take up disk space and impact correctness. To avoid this, the cache will 
evict any
+/// file handle that has been unused for longer than threshold specified by
+/// `unused_handle_timeout_secs`. Eviction is disabled when the threshold is 0.
+///
+/// TODO: The cache should also evict file handles more aggressively if the 
file handle's
+/// mtime is older than the file's current mtime.
+template <size_t NUM_PARTITIONS>
+class FileHandleCache {
+ public:
+  /// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS
+  /// partitions. If the capacity does not split evenly, then the capacity is 
rounded
+  /// up. The cache will age out any file handle that is unused for
+  /// `unused_handle_timeout_secs` seconds. Age out is disabled if this is set 
to zero.
+  FileHandleCache(size_t capacity, uint64_t unused_handle_timeout_secs);
+
+  /// Destructor is only called for backend tests
+  ~FileHandleCache();
+
+  /// Starts up a thread that monitors the age of file handles and evicts any 
that
+  /// exceed the limit.
+  Status Init() WARN_UNUSED_RESULT;
+
+  /// Get a file handle from the cache for the specified filename (fname) and
+  /// last modification time (mtime). This will hash the filename to determine
+  /// which partition to use for this file handle.
+  ///
+  /// If 'require_new_handle' is false and the partition contains an available 
handle,
+  /// the handle is returned and cache_hit is set to true. Otherwise, the 
partition will
+  /// try to construct a file handle and add it to the partition. On success, 
the new
+  /// file handle will be returned with cache_hit set to false. On failure, 
nullptr will
+  /// be returned. In either case, the partition may evict a file handle to 
make room
+  /// for the new file handle.
+  ///
+  /// This obtains exclusive control over the returned file handle. It must be 
paired
+  /// with a call to ReleaseFileHandle to release exclusive control.
+  HdfsFileHandle* GetFileHandle(const hdfsFS& fs, std::string* fname, int64_t 
mtime,
+      bool require_new_handle, bool* cache_hit);
+
+  /// Release the exclusive hold on the specified file handle (which was 
obtained
+  /// by calling GetFileHandle). The cache may evict a file handle if the 
cache is
+  /// above capacity. If 'destroy_handle' is true, immediately remove this 
handle
+  /// from the cache.
+  void ReleaseFileHandle(std::string* fname, HdfsFileHandle* fh, bool 
destroy_handle);
+
+ private:
+  struct FileHandleEntry;
+  typedef std::multimap<std::string, FileHandleEntry> MapType;
+
+  struct LruListEntry {
+    LruListEntry(typename MapType::iterator map_entry_in);
+    typename MapType::iterator map_entry;
+    uint64_t timestamp_seconds;
+  };
+  typedef std::list<LruListEntry> LruListType;
+
+  struct FileHandleEntry {
+    FileHandleEntry(HdfsFileHandle* fh_in, LruListType& lru_list)
+    : fh(fh_in), lru_entry(lru_list.end()) {}
+    std::unique_ptr<HdfsFileHandle> fh;
+
+    /// in_use is true for a file handle checked out via GetFileHandle() that 
has not
+    /// been returned via ReleaseFileHandle().
+    bool in_use = false;
+
+    /// Iterator to this element's location in the LRU list. This only points 
to a
+    /// valid location when in_use is true. For error-checking, this is set to
+    /// lru_list.end() when in_use is false.
+    typename LruListType::iterator lru_entry;
+  };
+
+  /// Each partition operates independently, and thus has its own cache, LRU 
list,
+  /// and corresponding lock. To avoid contention on the lock_ due to false 
sharing
+  /// the partitions are aligned to cache line boundaries.
+  struct FileHandleCachePartition : public CacheLineAligned {
+    /// Protects access to cache and lru_list.
+    SpinLock lock;
+
+    /// Multimap from the file name to the file handles for that file. The 
cache
+    /// can contain multiple file handles for the same file and some may have
+    /// different mtimes if the file is being modified. All file handles are 
always
+    /// owned by the cache.
+    MapType cache;
+
+    /// The LRU list only contains file handles that are not in use.
+    LruListType lru_list;
+
+    /// Maximum number of file handles in cache without evicting unused file 
handles.
+    /// It is not a strict limit, and can be exceeded if all file handles are 
in use.
+    size_t capacity;
+
+    /// Current number of file handles in the cache
+    size_t size;
+  };
+
+  /// Periodic check to evict unused file handles. Only executed by 
eviction_thread_.
+  void EvictHandlesLoop();
+  static const int64_t EVICT_HANDLES_PERIOD_MS = 1000;
+
+  /// If the partition is above its capacity, evict the oldest unused file 
handles to
+  /// enforce the capacity.
+  void EvictHandles(FileHandleCachePartition& p);
+
+  std::array<FileHandleCachePartition, NUM_PARTITIONS> cache_partitions_;
+
+  /// Maximum time before an unused file handle is aged out of the cache.
+  /// Aging out is disabled if this is set to 0.
+  uint64_t unused_handle_timeout_secs_;
+
+  /// Thread to check for unused file handles to evict. This thread will exit 
when
+  /// the shut_down_promise_ is set.
+  std::unique_ptr<Thread> eviction_thread_;
+  Promise<bool> shut_down_promise_;
+};
+}
+}
+
+#endif


Reply via email to