http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/handle-cache.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/handle-cache.inline.h 
b/be/src/runtime/io/handle-cache.inline.h
new file mode 100644
index 0000000..10db49e
--- /dev/null
+++ b/be/src/runtime/io/handle-cache.inline.h
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <tuple>
+
+#include "runtime/io/handle-cache.h"
+#include "util/hash-util.h"
+#include "util/time.h"
+
+#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
+#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
+
+namespace impala {
+namespace io {
+
+HdfsFileHandle::HdfsFileHandle(const hdfsFS& fs, const char* fname,
+    int64_t mtime)
+    : fs_(fs), hdfs_file_(hdfsOpenFile(fs, fname, O_RDONLY, 0, 0, 0)), 
mtime_(mtime) {
+  ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(1L);
+  VLOG_FILE << "hdfsOpenFile() file=" << fname << " fid=" << hdfs_file_;
+}
+
+HdfsFileHandle::~HdfsFileHandle() {
+  if (hdfs_file_ != nullptr && fs_ != nullptr) {
+    ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L);
+    VLOG_FILE << "hdfsCloseFile() fid=" << hdfs_file_;
+    hdfsCloseFile(fs_, hdfs_file_);
+  }
+  fs_ = nullptr;
+  hdfs_file_ = nullptr;
+}
+
+template <size_t NUM_PARTITIONS>
+  FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity,
+      uint64_t unused_handle_timeout_secs)
+  : unused_handle_timeout_secs_(unused_handle_timeout_secs) {
+  DCHECK_GT(NUM_PARTITIONS, 0);
+  size_t remainder = capacity % NUM_PARTITIONS;
+  size_t base_capacity = capacity / NUM_PARTITIONS;
+  size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : 
base_capacity);
+  for (FileHandleCachePartition& p : cache_partitions_) {
+    p.size = 0;
+    p.capacity = partition_capacity;
+  }
+}
+
+template <size_t NUM_PARTITIONS>
+FileHandleCache<NUM_PARTITIONS>::LruListEntry::LruListEntry(
+    typename MapType::iterator map_entry_in)
+     : map_entry(map_entry_in), timestamp_seconds(MonotonicSeconds()) {}
+
+template <size_t NUM_PARTITIONS>
+FileHandleCache<NUM_PARTITIONS>::~FileHandleCache() {
+  shut_down_promise_.Set(true);
+  if (eviction_thread_ != nullptr) eviction_thread_->Join();
+}
+
+template <size_t NUM_PARTITIONS>
+Status FileHandleCache<NUM_PARTITIONS>::Init() {
+  return Thread::Create("disk-io-mgr-handle-cache", "File Handle Timeout",
+      &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this, 
&eviction_thread_);
+}
+
+template <size_t NUM_PARTITIONS>
+HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
+    const hdfsFS& fs, std::string* fname, int64_t mtime, bool 
require_new_handle,
+    bool* cache_hit) {
+  // Hash the key and get appropriate partition
+  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS;
+  FileHandleCachePartition& p = cache_partitions_[index];
+  boost::lock_guard<SpinLock> g(p.lock);
+  pair<typename MapType::iterator, typename MapType::iterator> range =
+    p.cache.equal_range(*fname);
+
+  // If this requires a new handle, skip to the creation codepath. Otherwise,
+  // find an unused entry with the same mtime
+  FileHandleEntry* ret_elem = nullptr;
+  if (!require_new_handle) {
+    while (range.first != range.second) {
+      FileHandleEntry* elem = &range.first->second;
+      if (!elem->in_use && elem->fh->mtime() == mtime) {
+        // This element is currently in the lru_list, which means that 
lru_entry must
+        // be an iterator pointing into the lru_list.
+        DCHECK(elem->lru_entry != p.lru_list.end());
+        // Remove the element from the lru_list and designate that it is not on
+        // the lru_list by resetting its iterator to point to the end of the 
list.
+        p.lru_list.erase(elem->lru_entry);
+        elem->lru_entry = p.lru_list.end();
+        ret_elem = elem;
+        *cache_hit = true;
+        break;
+      }
+      ++range.first;
+    }
+  }
+
+  // There was no entry that was free or caller asked for a new handle
+  if (!ret_elem) {
+    *cache_hit = false;
+    // Create a new entry and move it into the map
+    HdfsFileHandle* new_fh = new HdfsFileHandle(fs, fname->data(), mtime);
+    if (!new_fh->ok()) {
+      delete new_fh;
+      return nullptr;
+    }
+    FileHandleEntry entry(new_fh, p.lru_list);
+    typename MapType::iterator new_it = p.cache.emplace_hint(range.second,
+        *fname, std::move(entry));
+    ret_elem = &new_it->second;
+    ++p.size;
+    if (p.size > p.capacity) EvictHandles(p);
+  }
+
+  DCHECK(ret_elem->fh.get() != nullptr);
+  DCHECK(!ret_elem->in_use);
+  ret_elem->in_use = true;
+  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
+  return ret_elem->fh.get();
+}
+
+template <size_t NUM_PARTITIONS>
+void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
+    HdfsFileHandle* fh, bool destroy_handle) {
+  DCHECK(fh != nullptr);
+  // Hash the key and get appropriate partition
+  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS;
+  FileHandleCachePartition& p = cache_partitions_[index];
+  boost::lock_guard<SpinLock> g(p.lock);
+  pair<typename MapType::iterator, typename MapType::iterator> range =
+    p.cache.equal_range(*fname);
+
+  // TODO: This can be optimized by maintaining some state in the file handle 
about
+  // its location in the map.
+  typename MapType::iterator release_it = range.first;
+  while (release_it != range.second) {
+    FileHandleEntry* elem = &release_it->second;
+    if (elem->fh.get() == fh) break;
+    ++release_it;
+  }
+  DCHECK(release_it != range.second);
+
+  // This file handle is no longer referenced
+  FileHandleEntry* release_elem = &release_it->second;
+  DCHECK(release_elem->in_use);
+  release_elem->in_use = false;
+  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
+  if (destroy_handle) {
+    --p.size;
+    p.cache.erase(release_it);
+    return;
+  }
+  // Hdfs can use some memory for readahead buffering. Calling unbuffer reduces
+  // this buffering so that the file handle takes up less memory when in the 
cache.
+  // If unbuffering is not supported, then hdfsUnbufferFile() will return a 
non-zero
+  // return code, and we close the file handle and remove it from the cache.
+  if (hdfsUnbufferFile(release_elem->fh->file()) == 0) {
+    // This FileHandleEntry must not be in the lru list already, because it was
+    // in use. Verify this by checking that the lru_entry is pointing to the 
end,
+    // which cannot be true for any element in the lru list.
+    DCHECK(release_elem->lru_entry == p.lru_list.end());
+    // Add this to the lru list, establishing links in both directions.
+    // The FileHandleEntry has an iterator to the LruListEntry and the
+    // LruListEntry has an iterator to the location of the FileHandleEntry in
+    // the cache.
+    release_elem->lru_entry = p.lru_list.emplace(p.lru_list.end(), release_it);
+    if (p.size > p.capacity) EvictHandles(p);
+  } else {
+    VLOG_FILE << "FS does not support file handle unbuffering, closing file="
+              << fname;
+    --p.size;
+    p.cache.erase(release_it);
+  }
+}
+
+template <size_t NUM_PARTITIONS>
+void FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop() {
+  while (true) {
+    for (FileHandleCachePartition& p : cache_partitions_) {
+      boost::lock_guard<SpinLock> g(p.lock);
+      EvictHandles(p);
+    }
+    // This Get() will time out until shutdown, when the promise is set.
+    bool timed_out;
+    shut_down_promise_.Get(EVICT_HANDLES_PERIOD_MS, &timed_out);
+    if (!timed_out) break;
+  }
+  // The promise must be set to true.
+  DCHECK(shut_down_promise_.IsSet());
+  DCHECK(shut_down_promise_.Get());
+}
+
+template <size_t NUM_PARTITIONS>
+void FileHandleCache<NUM_PARTITIONS>::EvictHandles(
+    FileHandleCache<NUM_PARTITIONS>::FileHandleCachePartition& p) {
+  uint64_t now = MonotonicSeconds();
+  uint64_t oldest_allowed_timestamp =
+      now > unused_handle_timeout_secs_ ? now - unused_handle_timeout_secs_ : 
0;
+  while (p.lru_list.size() > 0) {
+    // Peek at the oldest element
+    LruListEntry oldest_entry = p.lru_list.front();
+    typename MapType::iterator oldest_entry_map_it = oldest_entry.map_entry;
+    uint64_t oldest_entry_timestamp = oldest_entry.timestamp_seconds;
+    // If the oldest element does not need to be aged out and the cache is not 
over
+    // capacity, then we are done and there is nothing to evict.
+    if (p.size <= p.capacity && (unused_handle_timeout_secs_ == 0 ||
+        oldest_entry_timestamp >= oldest_allowed_timestamp)) {
+      return;
+    }
+    // Evict the oldest element
+    DCHECK(!oldest_entry_map_it->second.in_use);
+    p.cache.erase(oldest_entry_map_it);
+    p.lru_list.pop_front();
+    --p.size;
+  }
+}
+}
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc 
b/be/src/runtime/io/request-context.cc
new file mode 100644
index 0000000..287f53a
--- /dev/null
+++ b/be/src/runtime/io/request-context.cc
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/io/disk-io-mgr-internal.h"
+
+#include "common/names.h"
+
+using namespace impala;
+using namespace impala::io;
+
+void RequestContext::Cancel(const Status& status) {
+  DCHECK(!status.ok());
+
+  // Callbacks are collected in this vector and invoked while no lock is held.
+  vector<WriteRange::WriteDoneCallback> write_callbacks;
+  {
+    lock_guard<mutex> lock(lock_);
+    DCHECK(Validate()) << endl << DebugString();
+
+    // Already being cancelled
+    if (state_ == RequestContext::Cancelled) return;
+
+    DCHECK(status_.ok());
+    status_ = status;
+
+    // The reader will be put into a cancelled state until call cleanup is 
complete.
+    state_ = RequestContext::Cancelled;
+
+    // Cancel all scan ranges for this reader. Each range could be one one of
+    // four queues.
+    for (int i = 0; i < disk_states_.size(); ++i) {
+      RequestContext::PerDiskState& state = disk_states_[i];
+      RequestRange* range = NULL;
+      while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
+        if (range->request_type() == RequestType::READ) {
+          static_cast<ScanRange*>(range)->Cancel(status);
+        } else {
+          DCHECK(range->request_type() == RequestType::WRITE);
+          
write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
+        }
+      }
+
+      ScanRange* scan_range;
+      while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) {
+        scan_range->Cancel(status);
+      }
+      WriteRange* write_range;
+      while ((write_range = state.unstarted_write_ranges()->Dequeue()) != 
NULL) {
+        write_callbacks.push_back(write_range->callback_);
+      }
+    }
+
+    ScanRange* range = NULL;
+    while ((range = ready_to_start_ranges_.Dequeue()) != NULL) {
+      range->Cancel(status);
+    }
+    while ((range = blocked_ranges_.Dequeue()) != NULL) {
+      range->Cancel(status);
+    }
+    while ((range = cached_ranges_.Dequeue()) != NULL) {
+      range->Cancel(status);
+    }
+
+    // Schedule reader on all disks. The disks will notice it is cancelled and 
do any
+    // required cleanup
+    for (int i = 0; i < disk_states_.size(); ++i) {
+      RequestContext::PerDiskState& state = disk_states_[i];
+      state.ScheduleContext(this, i);
+    }
+  }
+
+  for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) {
+    write_callback(status_);
+  }
+
+  // Signal reader and unblock the GetNext/Read thread.  That read will fail 
with
+  // a cancelled status.
+  ready_to_start_ranges_cv_.NotifyAll();
+}
+
+void RequestContext::CancelAndMarkInactive() {
+  Cancel(Status::CANCELLED);
+
+  boost::unique_lock<boost::mutex> l(lock_);
+  DCHECK_NE(state_, Inactive);
+  DCHECK(Validate()) << endl << DebugString();
+
+  // Wait until the ranges finish up.
+  while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l);
+
+  // Validate that no buffers were leaked from this context.
+  DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString();
+  DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString();
+  DCHECK(Validate()) << endl << DebugString();
+  state_ = Inactive;
+}
+
+void RequestContext::AddRequestRange(
+    RequestRange* range, bool schedule_immediately) {
+  // DCHECK(lock_.is_locked()); // TODO: boost should have this API
+  RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+  if (state.done()) {
+    DCHECK_EQ(state.num_remaining_ranges(), 0);
+    state.set_done(false);
+    ++num_disks_with_ranges_;
+  }
+
+  bool schedule_context;
+  if (range->request_type() == RequestType::READ) {
+    ScanRange* scan_range = static_cast<ScanRange*>(range);
+    if (schedule_immediately) {
+      ScheduleScanRange(scan_range);
+    } else {
+      state.unstarted_scan_ranges()->Enqueue(scan_range);
+      num_unstarted_scan_ranges_.Add(1);
+    }
+    // If next_scan_range_to_start is NULL, schedule this RequestContext so 
that it will
+    // be set. If it's not NULL, this context will be scheduled when 
GetNextRange() is
+    // invoked.
+    schedule_context = state.next_scan_range_to_start() == NULL;
+  } else {
+    DCHECK(range->request_type() == RequestType::WRITE);
+    DCHECK(!schedule_immediately);
+    WriteRange* write_range = static_cast<WriteRange*>(range);
+    state.unstarted_write_ranges()->Enqueue(write_range);
+
+    // ScheduleContext() has no effect if the context is already scheduled,
+    // so this is safe.
+    schedule_context = true;
+  }
+
+  if (schedule_context) state.ScheduleContext(this, range->disk_id());
+  ++state.num_remaining_ranges();
+}
+
+RequestContext::RequestContext(
+    DiskIoMgr* parent, int num_disks, MemTracker* tracker)
+  : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {}
+
+// Dumps out request context information. Lock should be taken by caller
+string RequestContext::DebugString() const {
+  stringstream ss;
+  ss << endl << "  RequestContext: " << (void*)this << " (state=";
+  if (state_ == RequestContext::Inactive) ss << "Inactive";
+  if (state_ == RequestContext::Cancelled) ss << "Cancelled";
+  if (state_ == RequestContext::Active) ss << "Active";
+  if (state_ != RequestContext::Inactive) {
+    ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
+       << " #ready_buffers=" << num_ready_buffers_.Load()
+       << " #used_buffers=" << num_used_buffers_.Load()
+       << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
+       << " #finished_scan_ranges=" << num_finished_ranges_.Load()
+       << " #disk_with_ranges=" << num_disks_with_ranges_
+       << " #disks=" << num_disks_with_ranges_;
+    for (int i = 0; i < disk_states_.size(); ++i) {
+      ss << endl << "   " << i << ": "
+         << "is_on_queue=" << disk_states_[i].is_on_queue()
+         << " done=" << disk_states_[i].done()
+         << " #num_remaining_scan_ranges=" << 
disk_states_[i].num_remaining_ranges()
+         << " #in_flight_ranges=" << disk_states_[i].in_flight_ranges()->size()
+         << " #unstarted_scan_ranges=" << 
disk_states_[i].unstarted_scan_ranges()->size()
+         << " #unstarted_write_ranges="
+         << disk_states_[i].unstarted_write_ranges()->size()
+         << " #reading_threads=" << disk_states_[i].num_threads_in_op();
+    }
+  }
+  ss << ")";
+  return ss.str();
+}
+
+bool RequestContext::Validate() const {
+  if (state_ == RequestContext::Inactive) {
+    LOG(WARNING) << "state_ == RequestContext::Inactive";
+    return false;
+  }
+
+  if (num_used_buffers_.Load() < 0) {
+    LOG(WARNING) << "num_used_buffers_ < 0: #used=" << 
num_used_buffers_.Load();
+    return false;
+  }
+
+  if (num_ready_buffers_.Load() < 0) {
+    LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << 
num_ready_buffers_.Load();
+    return false;
+  }
+
+  int total_unstarted_ranges = 0;
+  for (int i = 0; i < disk_states_.size(); ++i) {
+    const PerDiskState& state = disk_states_[i];
+    bool on_queue = state.is_on_queue();
+    int num_reading_threads = state.num_threads_in_op();
+
+    total_unstarted_ranges += state.unstarted_scan_ranges()->size();
+
+    if (num_reading_threads < 0) {
+      LOG(WARNING) << "disk_id=" << i
+                   << "state.num_threads_in_read < 0: #threads="
+                   << num_reading_threads;
+      return false;
+    }
+
+    if (state_ != RequestContext::Cancelled) {
+      if (state.unstarted_scan_ranges()->size() + 
state.in_flight_ranges()->size() >
+          state.num_remaining_ranges()) {
+        LOG(WARNING) << "disk_id=" << i
+                     << " state.unstarted_ranges.size() + 
state.in_flight_ranges.size()"
+                     << " > state.num_remaining_ranges:"
+                     << " #unscheduled=" << 
state.unstarted_scan_ranges()->size()
+                     << " #in_flight=" << state.in_flight_ranges()->size()
+                     << " #remaining=" << state.num_remaining_ranges();
+        return false;
+      }
+
+      // If we have an in_flight range, the reader must be on the queue or 
have a
+      // thread actively reading for it.
+      if (!state.in_flight_ranges()->empty() && !on_queue && 
num_reading_threads == 0) {
+        LOG(WARNING) << "disk_id=" << i
+                     << " reader has inflight ranges but is not on the disk 
queue."
+                     << " #in_flight_ranges=" << 
state.in_flight_ranges()->size()
+                     << " #reading_threads=" << num_reading_threads
+                     << " on_queue=" << on_queue;
+        return false;
+      }
+
+      if (state.done() && num_reading_threads > 0) {
+        LOG(WARNING) << "disk_id=" << i
+                     << " state set to done but there are still threads 
working."
+                     << " #reading_threads=" << num_reading_threads;
+        return false;
+      }
+    } else {
+      // Is Cancelled
+      if (!state.in_flight_ranges()->empty()) {
+        LOG(WARNING) << "disk_id=" << i
+                     << "Reader cancelled but has in flight ranges.";
+        return false;
+      }
+      if (!state.unstarted_scan_ranges()->empty()) {
+        LOG(WARNING) << "disk_id=" << i
+                     << "Reader cancelled but has unstarted ranges.";
+        return false;
+      }
+    }
+
+    if (state.done() && on_queue) {
+      LOG(WARNING) << "disk_id=" << i
+                   << " state set to done but the reader is still on the disk 
queue."
+                   << " state.done=true and state.is_on_queue=true";
+      return false;
+    }
+  }
+
+  if (state_ != RequestContext::Cancelled) {
+    if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) {
+      LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges
+                   << " sum_in_states=" << num_unstarted_scan_ranges_.Load();
+      return false;
+    }
+  } else {
+    if (!ready_to_start_ranges_.empty()) {
+      LOG(WARNING) << "Reader cancelled but has ready to start ranges.";
+      return false;
+    }
+    if (!blocked_ranges_.empty()) {
+      LOG(WARNING) << "Reader cancelled but has blocked ranges.";
+      return false;
+    }
+  }
+
+  return true;
+}
+
+void RequestContext::PerDiskState::ScheduleContext(
+    RequestContext* context, int disk_id) {
+  if (!is_on_queue_ && !done_) {
+    is_on_queue_ = true;
+    context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h 
b/be/src/runtime/io/request-context.h
new file mode 100644
index 0000000..9807805
--- /dev/null
+++ b/be/src/runtime/io/request-context.h
@@ -0,0 +1,403 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_IO_REQUEST_CONTEXT_H
+#define IMPALA_RUNTIME_IO_REQUEST_CONTEXT_H
+
+#include "runtime/io/disk-io-mgr.h"
+#include "util/condition-variable.h"
+
+namespace impala {
+namespace io {
+/// A request context is used to group together I/O requests belonging to a 
client of the
+/// I/O manager for management and scheduling. For most I/O manager clients it 
is an
+/// opaque pointer, but some clients may need to include this header, e.g. to 
make the
+/// unique_ptr<DiskIoRequestContext> destructor work correctly.
+///
+/// Implementation Details
+/// ======================
+/// This object maintains a lot of state that is carefully synchronized. The 
context
+/// maintains state across all disks as well as per disk state.
+/// The unit for an IO request is a RequestRange, which may be a ScanRange or a
+/// WriteRange.
+/// A scan range for the reader is on one of five states:
+/// 1) PerDiskState's unstarted_ranges: This range has only been queued
+///    and nothing has been read from it.
+/// 2) RequestContext's ready_to_start_ranges_: This range is about to be 
started.
+///    As soon as the reader picks it up, it will move to the in_flight_ranges
+///    queue.
+/// 3) PerDiskState's in_flight_ranges: This range is being processed and will
+///    be read from the next time a disk thread picks it up in 
GetNextRequestRange()
+/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range
+///    anymore. We need the caller to pull a buffer off which will put this in
+///    the in_flight_ranges queue. These ranges are in the RequestContext's
+///    blocked_ranges_ queue.
+/// 5) ScanRange is cached and in the cached_ranges_ queue.
+//
+/// If the scan range is read and does not get blocked on the outgoing queue, 
the
+/// transitions are: 1 -> 2 -> 3.
+/// If the scan range does get blocked, the transitions are
+/// 1 -> 2 -> 3 -> (4 -> 3)*
+//
+/// In the case of a cached scan range, the range is immediately put in 
cached_ranges_.
+/// When the caller asks for the next range to process, we first pull ranges 
from
+/// the cache_ranges_ queue. If the range was cached, the range is removed and
+/// done (ranges are either entirely cached or not at all). If the cached read 
attempt
+/// fails, we put the range in state 1.
+//
+/// A write range for a context may be in one of two lists:
+/// 1) unstarted_write_ranges_ : Ranges that have been queued but not 
processed.
+/// 2) in_flight_ranges_: The write range is ready to be processed by the next 
disk thread
+///    that picks it up in GetNextRequestRange().
+//
+/// AddWriteRange() adds WriteRanges for a disk.
+/// It is the responsibility of the client to pin the data to be written via a 
WriteRange
+/// in memory. After a WriteRange has been written, a callback is invoked to 
inform the
+/// client that the write has completed.
+//
+/// An important assumption is that write does not exceed the maximum read 
size and that
+/// the entire range is written when the write request is handled. (In other 
words, writes
+/// are not broken up.)
+//
+/// When a RequestContext is processed by a disk thread in 
GetNextRequestRange(),
+/// a write range is always removed from the list of unstarted write ranges 
and appended
+/// to the in_flight_ranges_ queue. This is done to alternate reads and writes 
- a read
+/// that is scheduled (by calling GetNextRange()) is always followed by a 
write (if one
+/// exists).  And since at most one WriteRange can be present in 
in_flight_ranges_ at any
+/// time (once a write range is returned from GetNetxRequestRange() it is 
completed an
+/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can 
be queued up
+/// behind at most one write range.
+class RequestContext {
+ public:
+  ~RequestContext() { DCHECK_EQ(state_, Inactive) << "Must be unregistered."; }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(RequestContext);
+  friend class DiskIoMgr;
+  friend class ScanRange;
+
+  class PerDiskState;
+
+  enum State {
+    /// Reader is initialized and maps to a client
+    Active,
+
+    /// Reader is in the process of being cancelled.  Cancellation is 
coordinated between
+    /// different threads and when they are all complete, the reader context 
is moved to
+    /// the inactive state.
+    Cancelled,
+
+    /// Reader context does not map to a client.  Accessing memory in this 
context
+    /// is invalid (i.e. it is equivalent to a dangling pointer).
+    Inactive,
+  };
+
+  RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
+
+  /// Decrements the number of active disks for this reader.  If the disk count
+  /// goes to 0, the disk complete condition variable is signaled.
+  /// Reader lock must be taken before this call.
+  void DecrementDiskRefCount() {
+    // boost doesn't let us dcheck that the reader lock is taken
+    DCHECK_GT(num_disks_with_ranges_, 0);
+    if (--num_disks_with_ranges_ == 0) {
+      disks_complete_cond_var_.NotifyAll();
+    }
+    DCHECK(Validate()) << std::endl << DebugString();
+  }
+
+  /// Reader & Disk Scheduling: Readers that currently can't do work are not on
+  /// the disk's queue. These readers are ones that don't have any ranges in 
the
+  /// in_flight_queue AND have not prepared a range by setting 
next_range_to_start.
+  /// The rule to make sure readers are scheduled correctly is to ensure 
anytime a
+  /// range is put on the in_flight_queue or anytime next_range_to_start is 
set to
+  /// NULL, the reader is scheduled.
+
+  /// Adds range to in_flight_ranges, scheduling this reader on the disk 
threads
+  /// if necessary.
+  /// Reader lock must be taken before this.
+  void ScheduleScanRange(ScanRange* range) {
+    DCHECK_EQ(state_, Active);
+    DCHECK(range != NULL);
+    RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+    state.in_flight_ranges()->Enqueue(range);
+    state.ScheduleContext(this, range->disk_id());
+  }
+
+  /// Cancels the context with status code 'status'
+  void Cancel(const Status& status);
+
+  /// Cancel the context if not already cancelled, wait for all scan ranges to 
finish
+  /// and mark the context as inactive, after which it cannot be used.
+  void CancelAndMarkInactive();
+
+  /// Adds request range to disk queue for this request context. Currently,
+  /// schedule_immediately must be false is RequestRange is a write range.
+  void AddRequestRange(RequestRange* range, bool schedule_immediately);
+
+  /// Validates invariants of reader.  Reader lock must be taken beforehand.
+  bool Validate() const;
+
+  /// Dumps out reader information.  Lock should be taken by caller
+  std::string DebugString() const;
+
+  /// Parent object
+  DiskIoMgr* const parent_;
+
+  /// Memory used for this reader.  This is unowned by this object.
+  MemTracker* const mem_tracker_;
+
+  /// Total bytes read for this reader
+  RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
+
+  /// Total time spent in hdfs reading
+  RuntimeProfile::Counter* read_timer_ = nullptr;
+
+  /// Number of active read threads
+  RuntimeProfile::Counter* active_read_thread_counter_ = nullptr;
+
+  /// Disk access bitmap. The counter's bit[i] is set if disk id i has been 
accessed.
+  /// TODO: we can only support up to 64 disks with this bitmap but it lets us 
use a
+  /// builtin atomic instruction. Probably good enough for now.
+  RuntimeProfile::Counter* disks_accessed_bitmap_ = nullptr;
+
+  /// Total number of bytes read locally, updated at end of each range scan
+  AtomicInt64 bytes_read_local_{0};
+
+  /// Total number of bytes read via short circuit read, updated at end of 
each range scan
+  AtomicInt64 bytes_read_short_circuit_{0};
+
+  /// Total number of bytes read from date node cache, updated at end of each 
range scan
+  AtomicInt64 bytes_read_dn_cache_{0};
+
+  /// Total number of bytes from remote reads that were expected to be local.
+  AtomicInt64 unexpected_remote_bytes_{0};
+
+  /// The number of buffers that have been returned to the reader (via 
GetNext) that the
+  /// reader has not returned. Only included for debugging and diagnostics.
+  AtomicInt32 num_buffers_in_reader_{0};
+
+  /// The number of scan ranges that have been completed for this reader.
+  AtomicInt32 num_finished_ranges_{0};
+
+  /// The number of scan ranges that required a remote read, updated at the 
end of each
+  /// range scan. Only used for diagnostics.
+  AtomicInt32 num_remote_ranges_{0};
+
+  /// The total number of scan ranges that have not been started. Only used for
+  /// diagnostics. This is the sum of all unstarted_scan_ranges across all 
disks.
+  AtomicInt32 num_unstarted_scan_ranges_{0};
+
+  /// Total number of file handle opens where the file handle was present in 
the cache
+  AtomicInt32 cached_file_handles_hit_count_{0};
+
+  /// Total number of file handle opens where the file handle was not in the 
cache
+  AtomicInt32 cached_file_handles_miss_count_{0};
+
+  /// The number of buffers that are being used for this reader. This is the 
sum
+  /// of all buffers in ScanRange queues and buffers currently being read into 
(i.e. about
+  /// to be queued). This includes both IOMgr-allocated buffers and 
client-provided
+  /// buffers.
+  AtomicInt32 num_used_buffers_{0};
+
+  /// The total number of ready buffers across all ranges.  Ready buffers are 
buffers
+  /// that have been read from disk but not retrieved by the caller.
+  /// This is the sum of all queued buffers in all ranges for this reader 
context.
+  AtomicInt32 num_ready_buffers_{0};
+
+  /// All fields below are accessed by multiple threads and the lock needs to 
be
+  /// taken before accessing them. Must be acquired before ScanRange::lock_ if 
both
+  /// are held simultaneously.
+  boost::mutex lock_;
+
+  /// Current state of the reader
+  State state_ = Active;
+
+  /// Status of this reader.  Set to non-ok if cancelled.
+  Status status_;
+
+  /// The number of disks with scan ranges remaining (always equal to the sum 
of
+  /// disks with ranges).
+  int num_disks_with_ranges_ = 0;
+
+  /// This is the list of ranges that are expected to be cached on the DN.
+  /// When the reader asks for a new range (GetNextScanRange()), we first
+  /// return ranges from this list.
+  InternalQueue<ScanRange> cached_ranges_;
+
+  /// A list of ranges that should be returned in subsequent calls to
+  /// GetNextRange.
+  /// There is a trade-off with when to populate this list.  Populating it on
+  /// demand means consumers need to wait (happens in 
DiskIoMgr::GetNextRange()).
+  /// Populating it preemptively means we make worse scheduling decisions.
+  /// We currently populate one range per disk.
+  /// TODO: think about this some more.
+  InternalQueue<ScanRange> ready_to_start_ranges_;
+  ConditionVariable ready_to_start_ranges_cv_; // used with lock_
+
+  /// Ranges that are blocked due to back pressure on outgoing buffers.
+  InternalQueue<ScanRange> blocked_ranges_;
+
+  /// Condition variable for UnregisterContext() to wait for all disks to 
complete
+  ConditionVariable disks_complete_cond_var_;
+
+  /// Struct containing state per disk. See comments in the disk read loop on 
how
+  /// they are used.
+  class PerDiskState {
+   public:
+    bool done() const { return done_; }
+    void set_done(bool b) { done_ = b; }
+
+    int num_remaining_ranges() const { return num_remaining_ranges_; }
+    int& num_remaining_ranges() { return num_remaining_ranges_; }
+
+    ScanRange* next_scan_range_to_start() { return next_scan_range_to_start_; }
+    void set_next_scan_range_to_start(ScanRange* range) {
+      next_scan_range_to_start_ = range;
+    }
+
+    /// We need to have a memory barrier to prevent this load from being 
reordered
+    /// with num_threads_in_op(), since these variables are set without the 
reader
+    /// lock taken
+    bool is_on_queue() const {
+      bool b = is_on_queue_;
+      __sync_synchronize();
+      return b;
+    }
+
+    int num_threads_in_op() const {
+      int v = num_threads_in_op_.Load();
+      // TODO: determine whether this barrier is necessary for any callsites.
+      AtomicUtil::MemoryBarrier();
+      return v;
+    }
+
+    const InternalQueue<ScanRange>* unstarted_scan_ranges() const {
+      return &unstarted_scan_ranges_;
+    }
+    const InternalQueue<WriteRange>* unstarted_write_ranges() const {
+      return &unstarted_write_ranges_;
+    }
+    const InternalQueue<RequestRange>* in_flight_ranges() const {
+      return &in_flight_ranges_;
+    }
+
+    InternalQueue<ScanRange>* unstarted_scan_ranges() { return 
&unstarted_scan_ranges_; }
+    InternalQueue<WriteRange>* unstarted_write_ranges() {
+      return &unstarted_write_ranges_;
+    }
+    InternalQueue<RequestRange>* in_flight_ranges() { return 
&in_flight_ranges_; }
+
+    /// Schedules the request context on this disk if it's not already on the 
queue.
+    /// Context lock must be taken before this.
+    void ScheduleContext(RequestContext* context, int disk_id);
+
+    /// Increment the ref count on reader.  We need to track the number of 
threads per
+    /// reader per disk that are in the unlocked hdfs read code section. This 
is updated
+    /// by multiple threads without a lock so we need to use an atomic int.
+    void IncrementRequestThreadAndDequeue() {
+      num_threads_in_op_.Add(1);
+      is_on_queue_ = false;
+    }
+
+    void DecrementRequestThread() { num_threads_in_op_.Add(-1); }
+
+    /// Decrement request thread count and do final cleanup if this is the last
+    /// thread. RequestContext lock must be taken before this.
+    void DecrementRequestThreadAndCheckDone(RequestContext* context) {
+      num_threads_in_op_.Add(-1); // Also acts as a barrier.
+      if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) {
+        // This thread is the last one for this reader on this disk, do final 
cleanup
+        context->DecrementDiskRefCount();
+        done_ = true;
+      }
+    }
+
+   private:
+    /// If true, this disk is all done for this request context, including any 
cleanup.
+    /// If done is true, it means that this request must not be on this disk's 
queue
+    /// *AND* there are no threads currently working on this context. To 
satisfy
+    /// this, only the last thread (per disk) can set this to true.
+    bool done_ = true;
+
+    /// For each disk, keeps track if the context is on this disk's queue, 
indicating
+    /// the disk must do some work for this context. The disk needs to do work 
in 4 cases:
+    ///  1) in_flight_ranges is not empty, the disk needs to read for this 
reader.
+    ///  2) next_range_to_start is NULL, the disk needs to prepare a scan 
range to be
+    ///     read next.
+    ///  3) the reader has been cancelled and this disk needs to participate 
in the
+    ///     cleanup.
+    ///  4) A write range is added to queue.
+    /// In general, we only want to put a context on the disk queue if there 
is something
+    /// useful that can be done. If there's nothing useful, the disk queue 
will wake up
+    /// and then remove the reader from the queue. Doing this causes thrashing 
of the
+    /// threads.
+    bool is_on_queue_ = false;
+
+    /// For each disks, the number of request ranges that have not been fully 
read.
+    /// In the non-cancellation path, this will hit 0, and done will be set to 
true
+    /// by the disk thread. This is undefined in the cancellation path (the 
various
+    /// threads notice by looking at the RequestContext's state_).
+    int num_remaining_ranges_ = 0;
+
+    /// Queue of ranges that have not started being read.  This list is 
exclusive
+    /// with in_flight_ranges.
+    InternalQueue<ScanRange> unstarted_scan_ranges_;
+
+    /// Queue of pending IO requests for this disk in the order that they will 
be
+    /// processed. A ScanRange is added to this queue when it is returned in
+    /// GetNextRange(), or when it is added with schedule_immediately = true.
+    /// A WriteRange is added to this queue from unstarted_write_ranges_ for 
each
+    /// invocation of GetNextRequestRange() in WorkLoop().
+    /// The size of this queue is always less than or equal to 
num_remaining_ranges.
+    InternalQueue<RequestRange> in_flight_ranges_;
+
+    /// The next range to start for this reader on this disk. Each disk (for 
each reader)
+    /// picks the next range to start. The range is set here and also added to 
the
+    /// ready_to_start_ranges_ queue. The reader pulls from the queue in FIFO 
order,
+    /// so the ranges from different disks are round-robined. When the range 
is pulled
+    /// off the ready_to_start_ranges_ queue, it sets this variable to NULL, 
so the disk
+    /// knows to populate it again and add it to ready_to_start_ranges_ i.e. 
it is used
+    /// as a flag by DiskIoMgr::GetNextScanRange to determine if it needs to 
add another
+    /// range to ready_to_start_ranges_.
+    ScanRange* next_scan_range_to_start_ = nullptr;
+
+    /// For each disk, the number of threads issuing the underlying read/write 
on behalf
+    /// of this context. There are a few places where we release the context 
lock, do some
+    /// work, and then grab the lock again.  Because we don't hold the lock 
for the
+    /// entire operation, we need this ref count to keep track of which thread 
should do
+    /// final resource cleanup during cancellation.
+    /// Only the thread that sees the count at 0 should do the final cleanup.
+    AtomicInt32 num_threads_in_op_{0};
+
+    /// Queue of write ranges to process for this disk. A write range is 
always added
+    /// to in_flight_ranges_ in GetNextRequestRange(). There is a separate
+    /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate 
between reads
+    /// and writes. (Otherwise, since next_scan_range_to_start is set
+    /// in GetNextRequestRange() whenever it is null, repeated calls to
+    /// GetNextRequestRange() and GetNextRange() may result in only reads 
being processed)
+    InternalQueue<WriteRange> unstarted_write_ranges_;
+  };
+
+  /// Per disk states to synchronize multiple disk threads accessing the same 
request
+  /// context.
+  std::vector<PerDiskState> disk_states_;
+};
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h 
b/be/src/runtime/io/request-ranges.h
new file mode 100644
index 0000000..c1b3bbe
--- /dev/null
+++ b/be/src/runtime/io/request-ranges.h
@@ -0,0 +1,471 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_IO_REQUEST_RANGES_H
+#define IMPALA_RUNTIME_IO_REQUEST_RANGES_H
+
+#include <cstdint>
+#include <deque>
+
+#include <boost/thread/mutex.hpp>
+
+#include "common/hdfs.h"
+#include "common/status.h"
+#include "util/condition-variable.h"
+#include "util/internal-queue.h"
+
+namespace impala {
+class MemTracker;
+
+namespace io {
+class DiskIoMgr;
+class RequestContext;
+class HdfsFileHandle;
+class ScanRange;
+
+/// Buffer struct that is used by the caller and IoMgr to pass read buffers.
+/// It is is expected that only one thread has ownership of this object at a
+/// time.
+class BufferDescriptor {
+ public:
+  ~BufferDescriptor() {
+    DCHECK(buffer_ == nullptr); // Check we didn't leak a buffer.
+  }
+
+  ScanRange* scan_range() { return scan_range_; }
+  uint8_t* buffer() { return buffer_; }
+  int64_t buffer_len() { return buffer_len_; }
+  int64_t len() { return len_; }
+  bool eosr() { return eosr_; }
+
+  /// Returns the offset within the scan range that this buffer starts at
+  int64_t scan_range_offset() const { return scan_range_offset_; }
+
+  /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and set
+  /// 'mem_tracker_' to 'dst'. 'mem_tracker_' and 'dst' must be non-NULL. Does 
not
+  /// check memory limits on 'dst': the caller should check the memory limit 
if a
+  /// different memory limit may apply to 'dst'. If the buffer was a 
client-provided
+  /// buffer, transferring is not allowed.
+  /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp.
+  void TransferOwnership(MemTracker* dst);
+
+ private:
+  friend class DiskIoMgr;
+  friend class ScanRange;
+  friend class RequestContext;
+
+  /// Create a buffer descriptor for a new reader, range and data buffer. The 
buffer
+  /// memory should already be accounted against 'mem_tracker'.
+  BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
+      ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len,
+      MemTracker* mem_tracker);
+
+  /// Return true if this is a cached buffer owned by HDFS.
+  bool is_cached() const;
+
+  /// Return true if this is a buffer owner by the client that was provided 
when
+  /// constructing the scan range.
+  bool is_client_buffer() const;
+
+  DiskIoMgr* const io_mgr_;
+
+  /// Reader that this buffer is for.
+  RequestContext* const reader_;
+
+  /// The current tracker this buffer is associated with. After initialisation,
+  /// NULL for cached buffers and non-NULL for all other buffers.
+  MemTracker* mem_tracker_;
+
+  /// Scan range that this buffer is for. Non-NULL when initialised.
+  ScanRange* const scan_range_;
+
+  /// buffer with the read contents
+  uint8_t* buffer_;
+
+  /// length of buffer_. For buffers from cached reads, the length is 0.
+  const int64_t buffer_len_;
+
+  /// length of read contents
+  int64_t len_ = 0;
+
+  /// true if the current scan range is complete
+  bool eosr_ = false;
+
+  /// Status of the read to this buffer. if status is not ok, 'buffer' is 
nullptr
+  Status status_;
+
+  int64_t scan_range_offset_ = 0;
+};
+
+/// The request type, read or write associated with a request range.
+struct RequestType {
+  enum type {
+    READ,
+    WRITE,
+  };
+};
+
+/// Represents a contiguous sequence of bytes in a single file.
+/// This is the common base class for read and write IO requests - ScanRange 
and
+/// WriteRange. Each disk thread processes exactly one RequestRange at a time.
+class RequestRange : public InternalQueue<RequestRange>::Node {
+ public:
+  hdfsFS fs() const { return fs_; }
+  const char* file() const { return file_.c_str(); }
+  std::string* file_string() { return &file_; }
+  int64_t offset() const { return offset_; }
+  int64_t len() const { return len_; }
+  int disk_id() const { return disk_id_; }
+  RequestType::type request_type() const { return request_type_; }
+
+ protected:
+  RequestRange(RequestType::type request_type)
+    : fs_(nullptr), offset_(-1), len_(-1), disk_id_(-1), 
request_type_(request_type) {}
+
+  /// Hadoop filesystem that contains file_, or set to nullptr for local 
filesystem.
+  hdfsFS fs_;
+
+  /// Path to file being read or written.
+  std::string file_;
+
+  /// Offset within file_ being read or written.
+  int64_t offset_;
+
+  /// Length of data read or written.
+  int64_t len_;
+
+  /// Id of disk containing byte range.
+  int disk_id_;
+
+  /// The type of IO request, READ or WRITE.
+  RequestType::type request_type_;
+};
+
+/// Param struct for different combinations of buffering.
+struct BufferOpts {
+ public:
+  // Set options for a read into an IoMgr-allocated or HDFS-cached buffer. 
Caching is
+  // enabled if 'try_cache' is true, the file is in the HDFS cache and 'mtime' 
matches
+  // the modified time of the cached file in the HDFS cache.
+  BufferOpts(bool try_cache, int64_t mtime)
+    : try_cache_(try_cache),
+      mtime_(mtime),
+      client_buffer_(nullptr),
+      client_buffer_len_(-1) {}
+
+  /// Set options for an uncached read into an IoMgr-allocated buffer.
+  static BufferOpts Uncached() {
+    return BufferOpts(false, NEVER_CACHE, nullptr, -1);
+  }
+
+  /// Set options to read the entire scan range into 'client_buffer'. The 
length of the
+  /// buffer, 'client_buffer_len', must fit the entire scan range. HDFS 
caching is not
+  /// enabled in this case.
+  static BufferOpts ReadInto(uint8_t* client_buffer, int64_t 
client_buffer_len) {
+    return BufferOpts(false, NEVER_CACHE, client_buffer, client_buffer_len);
+  }
+
+ private:
+  friend class ScanRange;
+
+  BufferOpts(
+      bool try_cache, int64_t mtime, uint8_t* client_buffer, int64_t 
client_buffer_len)
+    : try_cache_(try_cache),
+      mtime_(mtime),
+      client_buffer_(client_buffer),
+      client_buffer_len_(client_buffer_len) {}
+
+  /// If 'mtime_' is set to NEVER_CACHE, the file handle will never be cached, 
because
+  /// the modification time won't match.
+  const static int64_t NEVER_CACHE = -1;
+
+  /// If true, read from HDFS cache if possible.
+  const bool try_cache_;
+
+  /// Last modified time of the file associated with the scan range. If set to
+  /// NEVER_CACHE, caching is disabled.
+  const int64_t mtime_;
+
+  /// A destination buffer provided by the client, nullptr and -1 if no buffer.
+  uint8_t* const client_buffer_;
+  const int64_t client_buffer_len_;
+};
+
+/// ScanRange description. The caller must call Reset() to initialize the 
fields
+/// before calling AddScanRanges(). The private fields are used internally by
+/// the IoMgr.
+class ScanRange : public RequestRange {
+ public:
+  ScanRange();
+
+  virtual ~ScanRange();
+
+  /// Resets this scan range object with the scan range description. The scan 
range
+  /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr 
for the
+  /// local filesystem). The scan range must fall within the file bounds 
(offset >= 0
+  /// and offset + len <= file_length). 'disk_id' is the disk queue to add the 
range
+  /// to. If 'expected_local' is true, a warning is generated if the read did 
not
+  /// come from a local disk. 'buffer_opts' specifies buffer management 
options -
+  /// see the DiskIoMgr class comment and the BufferOpts comments for details.
+  /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary 
data.
+  void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int 
disk_id,
+      bool expected_local, const BufferOpts& buffer_opts, void* meta_data = 
nullptr);
+
+  void* meta_data() const { return meta_data_; }
+  bool try_cache() const { return try_cache_; }
+  bool expected_local() const { return expected_local_; }
+
+  /// Returns the next buffer for this scan range. buffer is an output 
parameter.
+  /// This function blocks until a buffer is ready or an error occurred. If 
this is
+  /// called when all buffers have been returned, *buffer is set to nullptr 
and Status::OK
+  /// is returned.
+  /// Only one thread can be in GetNext() at any time.
+  Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
+
+  /// Cancel this scan range. This cleans up all queued buffers and
+  /// wakes up any threads blocked on GetNext().
+  /// Status is the reason the range was cancelled. Must not be ok().
+  /// Status is returned to the user in GetNext().
+  void Cancel(const Status& status);
+
+  /// return a descriptive string for debug.
+  std::string DebugString() const;
+
+  int64_t mtime() const { return mtime_; }
+
+ private:
+  friend class BufferDescriptor;
+  friend class DiskIoMgr;
+  friend class RequestContext;
+
+  /// Initialize internal fields
+  void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
+
+  /// Enqueues a buffer for this range. This does not block.
+  /// Returns true if this scan range has hit the queue capacity, false 
otherwise.
+  /// The caller passes ownership of buffer to the scan range and it is not
+  /// valid to access buffer after this call. The reader lock must be held by 
the
+  /// caller.
+  bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
+      std::unique_ptr<BufferDescriptor> buffer);
+
+  /// Cleanup any queued buffers (i.e. due to cancellation). This cannot
+  /// be called with any locks taken.
+  void CleanupQueuedBuffers();
+
+  /// Validates the internal state of this range. lock_ must be taken
+  /// before calling this.
+  bool Validate();
+
+  /// Maximum length in bytes for hdfsRead() calls.
+  int64_t MaxReadChunkSize() const;
+
+  /// Opens the file for this range. This function only modifies state in this 
range.
+  /// If 'use_file_handle_cache' is true and this is a local hdfs file, then 
this scan
+  /// range will not maintain an exclusive file handle. It will borrow an hdfs 
file
+  /// handle from the file handle cache for each Read(), so Open() does 
nothing.
+  /// If 'use_file_handle_cache' is false or this is a remote hdfs file or 
this is
+  /// a local OS file, Open() will maintain a file handle on the scan range for
+  /// exclusive use by this scan range. An exclusive hdfs file handle still 
comes
+  /// from the cache, but it is a newly opened file handle that is held for the
+  /// entire duration of a scan range's lifetime and destroyed in Close().
+  /// All local OS files are opened using normal OS file APIs.
+  Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT;
+
+  /// Closes the file for this range. This function only modifies state in 
this range.
+  void Close();
+
+  /// Reads from this range into 'buffer', which has length 'buffer_len' 
bytes. Returns
+  /// the number of bytes read. The read position in this scan range is 
updated.
+  Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read,
+      bool* eosr) WARN_UNUSED_RESULT;
+
+  /// Get the read statistics from the Hdfs file handle and aggregate them to
+  /// the RequestContext. This clears the statistics on this file handle.
+  /// It is safe to pass hdfsFile by value, as hdfsFile's underlying type is a
+  /// pointer.
+  void GetHdfsStatistics(hdfsFile fh);
+
+  /// Reads from the DN cache. On success, sets cached_buffer_ to the DN buffer
+  /// and *read_succeeded to true.
+  /// If the data is not cached, returns ok() and *read_succeeded is set to 
false.
+  /// Returns a non-ok status if it ran into a non-continuable error.
+  ///  The reader lock must be held by the caller.
+  Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
+      bool* read_succeeded) WARN_UNUSED_RESULT;
+
+  /// Pointer to caller specified metadata. This is untouched by the io manager
+  /// and the caller can put whatever auxiliary data in here.
+  void* meta_data_ = nullptr;
+
+  /// If true, this scan range is expected to be cached. Note that this might 
be wrong
+  /// since the block could have been uncached. In that case, the cached path
+  /// will fail and we'll just put the scan range on the normal read path.
+  bool try_cache_ = false;
+
+  /// If true, we expect this scan range to be a local read. Note that if this 
is false,
+  /// it does not necessarily mean we expect the read to be remote, and that 
we never
+  /// create scan ranges where some of the range is expected to be remote and 
some of it
+  /// local.
+  /// TODO: we can do more with this
+  bool expected_local_ = false;
+
+  /// Total number of bytes read remotely. This is necessary to maintain a 
count of
+  /// the number of remote scan ranges. Since IO statistics can be collected 
multiple
+  /// times for a scan range, it is necessary to keep some state about whether 
this
+  /// scan range has already been counted as remote. There is also a 
requirement to
+  /// log the number of unexpected remote bytes for a scan range. To solve both
+  /// requirements, maintain num_remote_bytes_ on the ScanRange and push it to 
the
+  /// reader_ once at the close of the scan range.
+  int64_t num_remote_bytes_;
+
+  DiskIoMgr* io_mgr_ = nullptr;
+
+  /// Reader/owner of the scan range
+  RequestContext* reader_ = nullptr;
+
+  /// File handle either to hdfs or local fs (FILE*)
+  /// The hdfs file handle is only stored here in three cases:
+  /// 1. The file handle cache is off (max_cached_file_handles == 0).
+  /// 2. The scan range is using hdfs caching.
+  /// -OR-
+  /// 3. The hdfs file is expected to be remote (expected_local_ == false)
+  /// In each case, the scan range gets a new file handle from the file handle 
cache
+  /// at Open(), holds it exclusively, and destroys it in Close().
+  union {
+    FILE* local_file_ = nullptr;
+    HdfsFileHandle* exclusive_hdfs_fh_;
+  };
+
+  /// Tagged union that holds a buffer for the cases when there is a buffer 
allocated
+  /// externally from DiskIoMgr that is associated with the scan range.
+  enum class ExternalBufferTag { CLIENT_BUFFER, CACHED_BUFFER, NO_BUFFER };
+  ExternalBufferTag external_buffer_tag_;
+  union {
+    /// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER.
+    struct {
+      /// Client-provided buffer to read the whole scan range into.
+      uint8_t* data;
+
+      /// Length of the client-provided buffer.
+      int64_t len;
+    } client_buffer_;
+
+    /// Valid and non-NULL if the external_buffer_tag_ is CACHED_BUFFER, which 
means
+    /// that a cached read succeeded and all the bytes for the range are in 
this buffer.
+    struct hadoopRzBuffer* cached_buffer_ = nullptr;
+  };
+
+  /// Lock protecting fields below.
+  /// This lock should not be taken during Open()/Read()/Close().
+  /// If RequestContext::lock_ and this lock need to be held simultaneously,
+  /// RequestContext::lock_ must be taken first.
+  boost::mutex lock_;
+
+  /// Number of bytes read so far for this scan range
+  int bytes_read_;
+
+  /// Status for this range. This is non-ok if is_cancelled_ is true.
+  /// Note: an individual range can fail without the RequestContext being
+  /// cancelled. This allows us to skip individual ranges.
+  Status status_;
+
+  /// If true, the last buffer for this scan range has been queued.
+  bool eosr_queued_ = false;
+
+  /// If true, the last buffer for this scan range has been returned.
+  bool eosr_returned_ = false;
+
+  /// If true, this scan range has been removed from the reader's 
in_flight_ranges
+  /// queue because the ready_buffers_ queue is full.
+  bool blocked_on_queue_ = false;
+
+  /// IO buffers that are queued for this scan range.
+  /// Condition variable for GetNext
+  ConditionVariable buffer_ready_cv_;
+  std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
+
+  /// Lock that should be taken during hdfs calls. Only one thread (the disk 
reading
+  /// thread) calls into hdfs at a time so this lock does not have performance 
impact.
+  /// This lock only serves to coordinate cleanup. Specifically it serves to 
ensure
+  /// that the disk threads are finished with HDFS calls before is_cancelled_ 
is set
+  /// to true and cleanup starts.
+  /// If this lock and lock_ need to be taken, lock_ must be taken first.
+  boost::mutex hdfs_lock_;
+
+  /// If true, this scan range has been cancelled.
+  bool is_cancelled_ = false;
+
+  /// Last modified time of the file associated with the scan range
+  int64_t mtime_;
+};
+
+/// Used to specify data to be written to a file and offset.
+/// It is the responsibility of the client to ensure that the data to be 
written is
+/// valid and that the file to be written to exists until the callback is 
invoked.
+/// A callback is invoked to inform the client when the write is done.
+class WriteRange : public RequestRange {
+ public:
+  /// This callback is invoked on each WriteRange after the write is complete 
or the
+  /// context is cancelled. The status returned by the callback parameter 
indicates
+  /// if the write was successful (i.e. Status::OK), if there was an error
+  /// TStatusCode::RUNTIME_ERROR) or if the context was cancelled
+  /// (TStatusCode::CANCELLED). The callback is only invoked if this 
WriteRange was
+  /// successfully added (i.e. AddWriteRange() succeeded). No locks are held 
while
+  /// the callback is invoked.
+  typedef std::function<void(const Status&)> WriteDoneCallback;
+  WriteRange(const std::string& file, int64_t file_offset, int disk_id,
+      WriteDoneCallback callback);
+
+  /// Change the file and offset of this write range. Data and callbacks are 
unchanged.
+  /// Can only be called when the write is not in flight (i.e. before 
AddWriteRange()
+  /// is called or after the write callback was called).
+  void SetRange(const std::string& file, int64_t file_offset, int disk_id);
+
+  /// Set the data and number of bytes to be written for this WriteRange.
+  /// Can only be called when the write is not in flight (i.e. before 
AddWriteRange()
+  /// is called or after the write callback was called).
+  void SetData(const uint8_t* buffer, int64_t len);
+
+  const uint8_t* data() const { return data_; }
+
+ private:
+  friend class DiskIoMgr;
+  friend class RequestContext;
+  friend class ScanRange;
+
+  /// Data to be written. RequestRange::len_ contains the length of data
+  /// to be written.
+  const uint8_t* data_;
+
+  /// Callback to invoke after the write is complete.
+  WriteDoneCallback callback_;
+};
+
+inline bool BufferDescriptor::is_cached() const {
+  return scan_range_->external_buffer_tag_
+      == ScanRange::ExternalBufferTag::CACHED_BUFFER;
+}
+
+inline bool BufferDescriptor::is_client_buffer() const {
+  return scan_range_->external_buffer_tag_
+      == ScanRange::ExternalBufferTag::CLIENT_BUFFER;
+}
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
new file mode 100644
index 0000000..b7655a8
--- /dev/null
+++ b/be/src/runtime/io/scan-range.cc
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/disk-io-mgr-internal.h"
+#include "util/error-util.h"
+#include "util/hdfs-util.h"
+
+#include "common/names.h"
+
+using namespace impala;
+using namespace impala::io;
+
+DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of 
hdfsRead() "
+    "when performing HDFS read operations. This is necessary to use HDFS 
hedged reads "
+    "(assuming the HDFS client is configured to do so).");
+
+// TODO: Run perf tests and empirically settle on the most optimal default 
value for the
+// read buffer size. Currently setting it as 128k for the same reason as for 
S3, i.e.
+// due to JNI array allocation and memcpy overhead, 128k was emperically found 
to have the
+// least overhead.
+DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to 
use when "
+    "reading from ADLS.");
+
+// Implementation of the ScanRange functionality. Each ScanRange contains a 
queue
+// of ready buffers. For each ScanRange, there is only a single producer and
+// consumer thread, i.e. only one disk thread will push to a scan range at
+// any time and only one thread will remove from the queue. This is to 
guarantee
+// that buffers are queued and read in file order.
+
+bool ScanRange::EnqueueBuffer(
+    const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> 
buffer) {
+  DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
+  {
+    unique_lock<mutex> scan_range_lock(lock_);
+    DCHECK(Validate()) << DebugString();
+    DCHECK(!eosr_returned_);
+    DCHECK(!eosr_queued_);
+    if (is_cancelled_) {
+      // Return the buffer, this range has been cancelled
+      if (buffer->buffer_ != nullptr) {
+        io_mgr_->num_buffers_in_readers_.Add(1);
+        reader_->num_buffers_in_reader_.Add(1);
+      }
+      reader_->num_used_buffers_.Add(-1);
+      io_mgr_->ReturnBuffer(move(buffer));
+      return false;
+    }
+    reader_->num_ready_buffers_.Add(1);
+    eosr_queued_ = buffer->eosr();
+    ready_buffers_.emplace_back(move(buffer));
+
+    DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
+    blocked_on_queue_ = ready_buffers_.size() == 
DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
+  }
+
+  buffer_ready_cv_.NotifyOne();
+
+  return blocked_on_queue_;
+}
+
+Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
+  DCHECK(*buffer == nullptr);
+  bool eosr;
+  {
+    unique_lock<mutex> scan_range_lock(lock_);
+    if (eosr_returned_) return Status::OK();
+    DCHECK(Validate()) << DebugString();
+
+    while (ready_buffers_.empty() && !is_cancelled_) {
+      buffer_ready_cv_.Wait(scan_range_lock);
+    }
+
+    if (is_cancelled_) {
+      DCHECK(!status_.ok());
+      return status_;
+    }
+
+    // Remove the first ready buffer from the queue and return it
+    DCHECK(!ready_buffers_.empty());
+    DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
+    *buffer = move(ready_buffers_.front());
+    ready_buffers_.pop_front();
+    eosr_returned_ = (*buffer)->eosr();
+    eosr = (*buffer)->eosr();
+  }
+
+  // Update tracking counters. The buffer has now moved from the IoMgr to the
+  // caller.
+  io_mgr_->num_buffers_in_readers_.Add(1);
+  reader_->num_buffers_in_reader_.Add(1);
+  reader_->num_ready_buffers_.Add(-1);
+  reader_->num_used_buffers_.Add(-1);
+  if (eosr) reader_->num_finished_ranges_.Add(1);
+
+  Status status = (*buffer)->status_;
+  if (!status.ok()) {
+    io_mgr_->ReturnBuffer(move(*buffer));
+    return status;
+  }
+
+  unique_lock<mutex> reader_lock(reader_->lock_);
+
+  DCHECK(reader_->Validate()) << endl << reader_->DebugString();
+  if (reader_->state_ == RequestContext::Cancelled) {
+    reader_->blocked_ranges_.Remove(this);
+    Cancel(reader_->status_);
+    io_mgr_->ReturnBuffer(move(*buffer));
+    return status_;
+  }
+
+  {
+    // Check to see if we can re-schedule a blocked range. Note that 
EnqueueBuffer()
+    // may have been called after we released 'lock_' above so we need to 
re-check
+    // whether the queue is full.
+    unique_lock<mutex> scan_range_lock(lock_);
+    if (blocked_on_queue_
+        && ready_buffers_.size() < DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT
+        && !eosr_queued_) {
+      blocked_on_queue_ = false;
+      // This scan range was blocked and is no longer, add it to the reader
+      // queue again.
+      reader_->blocked_ranges_.Remove(this);
+      reader_->ScheduleScanRange(this);
+    }
+  }
+  return Status::OK();
+}
+
+void ScanRange::Cancel(const Status& status) {
+  // Cancelling a range that was never started, ignore.
+  if (io_mgr_ == nullptr) return;
+
+  DCHECK(!status.ok());
+  {
+    // Grab both locks to make sure that all working threads see is_cancelled_.
+    unique_lock<mutex> scan_range_lock(lock_);
+    unique_lock<mutex> hdfs_lock(hdfs_lock_);
+    DCHECK(Validate()) << DebugString();
+    if (is_cancelled_) return;
+    is_cancelled_ = true;
+    status_ = status;
+  }
+  buffer_ready_cv_.NotifyAll();
+  CleanupQueuedBuffers();
+
+  // For cached buffers, we can't close the range until the cached buffer is 
returned.
+  // Close() is called from DiskIoMgr::ReturnBuffer().
+  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close();
+}
+
+void ScanRange::CleanupQueuedBuffers() {
+  DCHECK(is_cancelled_);
+  io_mgr_->num_buffers_in_readers_.Add(ready_buffers_.size());
+  reader_->num_buffers_in_reader_.Add(ready_buffers_.size());
+  reader_->num_used_buffers_.Add(-ready_buffers_.size());
+  reader_->num_ready_buffers_.Add(-ready_buffers_.size());
+
+  while (!ready_buffers_.empty()) {
+    io_mgr_->ReturnBuffer(move(ready_buffers_.front()));
+    ready_buffers_.pop_front();
+  }
+}
+
+string ScanRange::DebugString() const {
+  stringstream ss;
+  ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
+     << " len=" << len_ << " bytes_read=" << bytes_read_
+     << " buffer_queue=" << ready_buffers_.size()
+     << " hdfs_file=" << exclusive_hdfs_fh_;
+  return ss.str();
+}
+
+bool ScanRange::Validate() {
+  if (bytes_read_ > len_) {
+    LOG(WARNING) << "Bytes read tracking is wrong. Shouldn't read past the 
scan range."
+                 << " bytes_read_=" << bytes_read_ << " len_=" << len_;
+    return false;
+  }
+  if (eosr_returned_ && !eosr_queued_) {
+    LOG(WARNING) << "Returned eosr to reader before finishing reading the scan 
range"
+                 << " eosr_returned_=" << eosr_returned_
+                 << " eosr_queued_=" << eosr_queued_;
+    return false;
+  }
+  return true;
+}
+
+ScanRange::ScanRange()
+  : RequestRange(RequestType::READ),
+    num_remote_bytes_(0),
+    external_buffer_tag_(ExternalBufferTag::NO_BUFFER),
+    mtime_(-1) {}
+
+ScanRange::~ScanRange() {
+  DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
+  DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
+      << "Cached buffer was not released.";
+}
+
+void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
+    int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* 
meta_data) {
+  DCHECK(ready_buffers_.empty());
+  DCHECK(file != nullptr);
+  DCHECK_GE(len, 0);
+  DCHECK_GE(offset, 0);
+  DCHECK(buffer_opts.client_buffer_ == nullptr ||
+         buffer_opts.client_buffer_len_ >= len_);
+  fs_ = fs;
+  file_ = file;
+  len_ = len;
+  offset_ = offset;
+  disk_id_ = disk_id;
+  try_cache_ = buffer_opts.try_cache_;
+  mtime_ = buffer_opts.mtime_;
+  expected_local_ = expected_local;
+  num_remote_bytes_ = 0;
+  meta_data_ = meta_data;
+  if (buffer_opts.client_buffer_ != nullptr) {
+    external_buffer_tag_ = ExternalBufferTag::CLIENT_BUFFER;
+    client_buffer_.data = buffer_opts.client_buffer_;
+    client_buffer_.len = buffer_opts.client_buffer_len_;
+  } else {
+    external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
+  }
+  io_mgr_ = nullptr;
+  reader_ = nullptr;
+  exclusive_hdfs_fh_ = nullptr;
+}
+
+void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
+  DCHECK(exclusive_hdfs_fh_ == nullptr);
+  DCHECK(local_file_ == nullptr);
+  // Reader must provide MemTracker or a buffer.
+  DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER
+      || reader->mem_tracker_ != nullptr);
+  io_mgr_ = io_mgr;
+  reader_ = reader;
+  local_file_ = nullptr;
+  exclusive_hdfs_fh_ = nullptr;
+  bytes_read_ = 0;
+  is_cancelled_ = false;
+  eosr_queued_= false;
+  eosr_returned_= false;
+  blocked_on_queue_ = false;
+  DCHECK(Validate()) << DebugString();
+}
+
+Status ScanRange::Open(bool use_file_handle_cache) {
+  unique_lock<mutex> hdfs_lock(hdfs_lock_);
+  if (is_cancelled_) return Status::CANCELLED;
+
+  if (fs_ != nullptr) {
+    if (exclusive_hdfs_fh_ != nullptr) return Status::OK();
+    // With file handle caching, the scan range does not maintain its own
+    // hdfs file handle. File handle caching is only used for local files,
+    // so s3 and remote filesystems should obtain an exclusive file handle
+    // for each scan range.
+    if (use_file_handle_cache && expected_local_) return Status::OK();
+    // Get a new exclusive file handle.
+    exclusive_hdfs_fh_ = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
+        mtime(), reader_, true);
+    if (exclusive_hdfs_fh_ == nullptr) {
+      return Status(TErrorCode::DISK_IO_ERROR,
+          GetHdfsErrorMsg("Failed to open HDFS file ", file_));
+    }
+
+    if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) != 0) {
+      // Destroy the file handle and remove it from the cache.
+      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, 
true);
+      exclusive_hdfs_fh_ = nullptr;
+      return Status(TErrorCode::DISK_IO_ERROR,
+          Substitute("Error seeking to $0 in file: $1 $2", offset_, file_,
+          GetHdfsErrorMsg("")));
+    }
+  } else {
+    if (local_file_ != nullptr) return Status::OK();
+
+    local_file_ = fopen(file(), "r");
+    if (local_file_ == nullptr) {
+      return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not open 
file: $0: $1",
+            file_, GetStrErrMsg()));
+    }
+    if (fseek(local_file_, offset_, SEEK_SET) == -1) {
+      fclose(local_file_);
+      local_file_ = nullptr;
+      return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not seek to 
$0 "
+          "for file: $1: $2", offset_, file_, GetStrErrMsg()));
+    }
+  }
+  if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
+    ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
+  }
+  return Status::OK();
+}
+
+void ScanRange::Close() {
+  unique_lock<mutex> hdfs_lock(hdfs_lock_);
+  bool closed_file = false;
+  if (fs_ != nullptr) {
+    if (exclusive_hdfs_fh_ != nullptr) {
+      GetHdfsStatistics(exclusive_hdfs_fh_->file());
+
+      if (external_buffer_tag_ == ExternalBufferTag::CACHED_BUFFER) {
+        hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_);
+        cached_buffer_ = nullptr;
+        external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
+      }
+
+      // Destroy the file handle and remove it from the cache.
+      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, 
true);
+      exclusive_hdfs_fh_ = nullptr;
+      closed_file = true;
+    }
+
+    if (FLAGS_use_hdfs_pread) {
+      // Update Hedged Read Metrics.
+      // We call it only if the --use_hdfs_pread flag is set, to avoid having 
the
+      // libhdfs client malloc and free a hdfsHedgedReadMetrics object 
unnecessarily
+      // otherwise. 'hedged_metrics' is only set upon success.
+      struct hdfsHedgedReadMetrics* hedged_metrics;
+      int success = hdfsGetHedgedReadMetrics(fs_, &hedged_metrics);
+      if (success == 0) {
+        
ImpaladMetrics::HEDGED_READ_OPS->set_value(hedged_metrics->hedgedReadOps);
+        
ImpaladMetrics::HEDGED_READ_OPS_WIN->set_value(hedged_metrics->hedgedReadOpsWin);
+        hdfsFreeHedgedReadMetrics(hedged_metrics);
+      }
+    }
+
+    if (num_remote_bytes_ > 0) {
+      reader_->num_remote_ranges_.Add(1L);
+      if (expected_local_) {
+        reader_->unexpected_remote_bytes_.Add(num_remote_bytes_);
+        VLOG_FILE << "Unexpected remote HDFS read of "
+                  << PrettyPrinter::Print(num_remote_bytes_, TUnit::BYTES)
+                  << " for file '" << file_ << "'";
+      }
+    }
+  } else {
+    if (local_file_ == nullptr) return;
+    fclose(local_file_);
+    local_file_ = nullptr;
+    closed_file = true;
+  }
+  if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
+    ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
+  }
+}
+
+int64_t ScanRange::MaxReadChunkSize() const {
+  // S3 InputStreams don't support DIRECT_READ (i.e. java.nio.ByteBuffer read()
+  // interface).  So, hdfsRead() needs to allocate a Java byte[] and copy the 
data out.
+  // Profiles show that both the JNI array allocation and the memcpy adds much 
more
+  // overhead for larger buffers, so limit the size of each read request.  
128K was
+  // chosen empirically by trying values between 4K and 8M and optimizing for 
lower CPU
+  // utilization and higher S3 througput.
+  if (disk_id_ == io_mgr_->RemoteS3DiskId()) {
+    DCHECK(IsS3APath(file()));
+    return 128 * 1024;
+  }
+  if (disk_id_ == io_mgr_->RemoteAdlsDiskId()) {
+    DCHECK(IsADLSPath(file()));
+    return FLAGS_adls_read_chunk_size;
+  }
+  // The length argument of hdfsRead() is an int. Ensure we don't overflow it.
+  return numeric_limits<int>::max();
+}
+
+// TODO: how do we best use the disk here.  e.g. is it good to break up a
+// 1MB read into 8 128K reads?
+// TODO: look at linux disk scheduling
+Status ScanRange::Read(
+    uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) {
+  unique_lock<mutex> hdfs_lock(hdfs_lock_);
+  if (is_cancelled_) return Status::CANCELLED;
+
+  *eosr = false;
+  *bytes_read = 0;
+  // Read until the end of the scan range or the end of the buffer.
+  int bytes_to_read = min(len_ - bytes_read_, buffer_len);
+  DCHECK_GE(bytes_to_read, 0);
+
+  if (fs_ != nullptr) {
+    HdfsFileHandle* borrowed_hdfs_fh = nullptr;
+    hdfsFile hdfs_file;
+
+    // If the scan range has an exclusive file handle, use it. Otherwise, 
borrow
+    // a file handle from the cache.
+    if (exclusive_hdfs_fh_ != nullptr) {
+      hdfs_file = exclusive_hdfs_fh_->file();
+    } else {
+      borrowed_hdfs_fh = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
+          mtime(), reader_, false);
+      if (borrowed_hdfs_fh == nullptr) {
+        return Status(TErrorCode::DISK_IO_ERROR,
+            GetHdfsErrorMsg("Failed to open HDFS file ", file_));
+      }
+      hdfs_file = borrowed_hdfs_fh->file();
+    }
+
+    int64_t max_chunk_size = MaxReadChunkSize();
+    Status status = Status::OK();
+    while (*bytes_read < bytes_to_read) {
+      int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
+      DCHECK_GE(chunk_size, 0);
+      // The hdfsRead() length argument is an int.
+      DCHECK_LE(chunk_size, numeric_limits<int>::max());
+      int current_bytes_read = -1;
+      // bytes_read_ is only updated after the while loop
+      int64_t position_in_file = offset_ + bytes_read_ + *bytes_read;
+      int num_retries = 0;
+      while (true) {
+        status = Status::OK();
+        // For file handles from the cache, any of the below file operations 
may fail
+        // due to a bad file handle. In each case, record the error, but allow 
for a
+        // retry to fix it.
+        if (FLAGS_use_hdfs_pread) {
+          current_bytes_read = hdfsPread(fs_, hdfs_file, position_in_file,
+              buffer + *bytes_read, chunk_size);
+          if (current_bytes_read == -1) {
+            status = Status(TErrorCode::DISK_IO_ERROR,
+                GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
+          }
+        } else {
+          // If the file handle is borrowed, it may not be at the appropriate
+          // location. Seek to the appropriate location.
+          bool seek_failed = false;
+          if (borrowed_hdfs_fh != nullptr) {
+            if (hdfsSeek(fs_, hdfs_file, position_in_file) != 0) {
+              status = Status(TErrorCode::DISK_IO_ERROR, Substitute("Error 
seeking to $0 "
+                  " in file: $1: $2", position_in_file, file_, 
GetHdfsErrorMsg("")));
+              seek_failed = true;
+            }
+          }
+          if (!seek_failed) {
+            current_bytes_read = hdfsRead(fs_, hdfs_file, buffer + *bytes_read,
+                chunk_size);
+            if (current_bytes_read == -1) {
+              status = Status(TErrorCode::DISK_IO_ERROR,
+                  GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
+            }
+          }
+        }
+
+        // Do not retry:
+        // - if read was successful (current_bytes_read != -1)
+        // - or if already retried once
+        // - or if this not using a borrowed file handle
+        DCHECK_LE(num_retries, 1);
+        if (current_bytes_read != -1 || borrowed_hdfs_fh == nullptr ||
+            num_retries == 1) {
+          break;
+        }
+        // The error may be due to a bad file handle. Reopen the file handle 
and retry.
+        ++num_retries;
+        RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(),
+            mtime(), &borrowed_hdfs_fh));
+        hdfs_file = borrowed_hdfs_fh->file();
+      }
+      if (!status.ok()) break;
+      if (current_bytes_read == 0) {
+        // No more bytes in the file. The scan range went past the end.
+        *eosr = true;
+        break;
+      }
+      *bytes_read += current_bytes_read;
+
+      // Collect and accumulate statistics
+      GetHdfsStatistics(hdfs_file);
+    }
+
+    if (borrowed_hdfs_fh != nullptr) {
+      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh, 
false);
+    }
+    if (!status.ok()) return status;
+  } else {
+    DCHECK(local_file_ != nullptr);
+    *bytes_read = fread(buffer, 1, bytes_to_read, local_file_);
+    DCHECK_GE(*bytes_read, 0);
+    DCHECK_LE(*bytes_read, bytes_to_read);
+    if (*bytes_read < bytes_to_read) {
+      if (ferror(local_file_) != 0) {
+        return Status(TErrorCode::DISK_IO_ERROR, Substitute("Error reading 
from $0"
+            "at byte offset: $1: $2", file_, offset_ + bytes_read_, 
GetStrErrMsg()));
+      } else {
+        // On Linux, we should only get partial reads from block devices on 
error or eof.
+        DCHECK(feof(local_file_) != 0);
+        *eosr = true;
+      }
+    }
+  }
+  bytes_read_ += *bytes_read;
+  DCHECK_LE(bytes_read_, len_);
+  if (bytes_read_ == len_) *eosr = true;
+  return Status::OK();
+}
+
+Status ScanRange::ReadFromCache(
+    const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
+  DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
+  DCHECK(try_cache_);
+  DCHECK_EQ(bytes_read_, 0);
+  *read_succeeded = false;
+  Status status = Open(false);
+  if (!status.ok()) return status;
+
+  // Cached reads not supported on local filesystem.
+  if (fs_ == nullptr) return Status::OK();
+
+  {
+    unique_lock<mutex> hdfs_lock(hdfs_lock_);
+    if (is_cancelled_) return Status::CANCELLED;
+
+    DCHECK(exclusive_hdfs_fh_ != nullptr);
+    DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
+    cached_buffer_ =
+      hadoopReadZero(exclusive_hdfs_fh_->file(), 
io_mgr_->cached_read_options_, len());
+    if (cached_buffer_ != nullptr) {
+      external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
+    }
+  }
+  // Data was not cached, caller will fall back to normal read path.
+  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
+    VLOG_QUERY << "Cache read failed for scan range: " << DebugString()
+               << ". Switching to disk read path.";
+    // Clean up the scan range state before re-issuing it.
+    Close();
+    return Status::OK();
+  }
+
+  // Cached read returned a buffer, verify we read the correct amount of data.
+  void* buffer = const_cast<void*>(hadoopRzBufferGet(cached_buffer_));
+  int32_t bytes_read = hadoopRzBufferLength(cached_buffer_);
+  // A partial read can happen when files are truncated.
+  // TODO: If HDFS ever supports partially cached blocks, we'll have to 
distinguish
+  // between errors and partially cached blocks here.
+  if (bytes_read < len()) {
+    VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". 
Expected "
+      << len() << " bytes, but read " << bytes_read << ". Switching to disk 
read path.";
+    // Close the scan range. 'read_succeeded' is still false, so the caller 
will fall back
+    // to non-cached read of this scan range.
+    Close();
+    return Status::OK();
+  }
+
+  // Create a single buffer desc for the entire scan range and enqueue that.
+  // 'mem_tracker' is nullptr because the memory is owned by the HDFS java 
client,
+  // not the Impala backend.
+  unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new 
BufferDescriptor(
+      io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr));
+  desc->len_ = bytes_read;
+  desc->scan_range_offset_ = 0;
+  desc->eosr_ = true;
+  bytes_read_ = bytes_read;
+  EnqueueBuffer(reader_lock, move(desc));
+  if (reader_->bytes_read_counter_ != nullptr) {
+    COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
+  }
+  *read_succeeded = true;
+  reader_->num_used_buffers_.Add(1);
+  return Status::OK();
+}
+
+void ScanRange::GetHdfsStatistics(hdfsFile hdfs_file) {
+  struct hdfsReadStatistics* stats;
+  if (IsHdfsPath(file())) {
+    int success = hdfsFileGetReadStatistics(hdfs_file, &stats);
+    if (success == 0) {
+      reader_->bytes_read_local_.Add(stats->totalLocalBytesRead);
+      
reader_->bytes_read_short_circuit_.Add(stats->totalShortCircuitBytesRead);
+      reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead);
+      if (stats->totalLocalBytesRead != stats->totalBytesRead) {
+        num_remote_bytes_ += stats->totalBytesRead - 
stats->totalLocalBytesRead;
+      }
+      hdfsFileFreeReadStatistics(stats);
+    }
+    hdfsFileClearReadStatistics(hdfs_file);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index d246024..2c08f30 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -29,7 +29,7 @@
 #include "kudu/util/slice.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/descriptors.h"
-#include "runtime/disk-io-mgr.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/mem-pool.h"
 
 namespace kudu {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 308b2c4..37219cc 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -260,7 +260,7 @@ CatalogServiceClientCache* 
RuntimeState::catalogd_client_cache() {
   return exec_env_->catalogd_client_cache();
 }
 
-DiskIoMgr* RuntimeState::io_mgr() {
+io::DiskIoMgr* RuntimeState::io_mgr() {
   return exec_env_->disk_io_mgr();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 74c27e5..4eb3e10 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -35,7 +35,6 @@ namespace impala {
 class BufferPool;
 class DataStreamRecvr;
 class DescriptorTbl;
-class DiskIoMgr;
 class Expr;
 class LlvmCodeGen;
 class MemTracker;
@@ -53,6 +52,10 @@ class TPlanFragmentCtx;
 class TPlanFragmentInstanceCtx;
 class QueryState;
 
+namespace io {
+  class DiskIoMgr;
+}
+
 /// TODO: move the typedefs into a separate .h (and fix the includes for that)
 
 /// Counts how many rows an INSERT query has added to a particular partition
@@ -124,7 +127,7 @@ class RuntimeState {
   HBaseTableFactory* htable_factory();
   ImpalaBackendClientCache* impalad_client_cache();
   CatalogServiceClientCache* catalogd_client_cache();
-  DiskIoMgr* io_mgr();
+  io::DiskIoMgr* io_mgr();
   MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
   MemTracker* query_mem_tracker();  // reference to the query_state_'s 
memtracker
   ReservationTracker* instance_buffer_reservation() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index e721510..5fb9a1c 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -18,7 +18,7 @@
 #ifndef IMPALA_RUNTIME_TEST_ENV
 #define IMPALA_RUNTIME_TEST_ENV
 
-#include "runtime/disk-io-mgr.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-tracker.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index dde6348..fbc0a36 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -47,6 +47,8 @@ DECLARE_int32(stress_scratch_write_delay_ms);
 
 namespace impala {
 
+using namespace io;
+
 class TmpFileMgrTest : public ::testing::Test {
  public:
   virtual void SetUp() {
@@ -130,7 +132,7 @@ class TmpFileMgrTest : public ::testing::Test {
     group->next_allocation_index_ = value;
   }
 
-  /// Helper to cancel the FileGroup DiskIoRequestContext.
+  /// Helper to cancel the FileGroup RequestContext.
   static void CancelIoContext(TmpFileMgr::FileGroup* group) {
     group->io_mgr_->CancelContext(group->io_ctx_.get());
   }
@@ -404,7 +406,7 @@ TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
       std::iota(data[i].begin(), data[i].end(), i);
     }
 
-    DiskIoMgr::WriteRange::WriteDoneCallback callback =
+    WriteRange::WriteDoneCallback callback =
         bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
     vector<unique_ptr<TmpFileMgr::WriteHandle>> handles(BLOCKS);
     // 'file_group' should allocate extra scratch bytes for this 'alloc_size'.
@@ -449,7 +451,7 @@ TEST_F(TmpFileMgrTest, TestProcessMemLimitExceeded) {
   CancelIoContext(&file_group);
 
   // After this error, writing via the file group should fail.
-  DiskIoMgr::WriteRange::WriteDoneCallback callback =
+  WriteRange::WriteDoneCallback callback =
       bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
   unique_ptr<TmpFileMgr::WriteHandle> handle;
   Status status = file_group.Write(MemRange(data.data(), DATA_SIZE), callback, 
&handle);
@@ -483,7 +485,7 @@ TEST_F(TmpFileMgrTest, TestEncryptionDuringCancellation) {
 
   // Start a write in flight, which should encrypt the data and write it to 
disk.
   unique_ptr<TmpFileMgr::WriteHandle> handle;
-  DiskIoMgr::WriteRange::WriteDoneCallback callback =
+  WriteRange::WriteDoneCallback callback =
       bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
   ASSERT_OK(file_group.Write(data_mem_range, callback, &handle));
   string file_path = handle->TmpFilePath();


Reply via email to