IMPALA-6121: remove I/O mgr request context cache

This simplifies the lifecycle of the request contexts and eliminates
some code. The comments claim that request context cache improves
performance when allocating smallish the objects. But allocating
from TCMalloc's thread caches should scale much better than a
global object pool protected by a lock.

I needed to move the definition to a non-internal header file so that it
was visible to clients that manage it by unique_ptr.

We also do not need to transfer the request contexts to the RuntimeState
since I/O buffers do not leave scanners now.

Testing:
Ran exhaustive tests.

Change-Id: I91414eceaa4938fccd74686fe6bebede6ef36108
Reviewed-on: http://gerrit.cloudera.org:8080/8408
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/87fc463e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/87fc463e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/87fc463e

Branch: refs/heads/master
Commit: 87fc463e0a98ce7402f89817ed847e77005b0ddb
Parents: e98d2f1
Author: Tim Armstrong <[email protected]>
Authored: Fri Oct 27 11:41:21 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Nov 15 22:05:10 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node-base.cc           |  52 +--
 be/src/exec/hdfs-scan-node-base.h            |   4 +-
 be/src/exec/hdfs-scan-node-mt.cc             |   2 +-
 be/src/exec/hdfs-scan-node.cc                |   9 +-
 be/src/runtime/disk-io-mgr-internal.h        | 409 +---------------------
 be/src/runtime/disk-io-mgr-reader-context.cc |  73 ++--
 be/src/runtime/disk-io-mgr-reader-context.h  | 406 +++++++++++++++++++++
 be/src/runtime/disk-io-mgr-stress.cc         |  17 +-
 be/src/runtime/disk-io-mgr-test.cc           | 298 ++++++++--------
 be/src/runtime/disk-io-mgr.cc                | 116 +-----
 be/src/runtime/disk-io-mgr.h                 |  33 +-
 be/src/runtime/runtime-state.cc              |  14 -
 be/src/runtime/runtime-state.h               |  15 -
 be/src/runtime/tmp-file-mgr-test.cc          |   2 +-
 be/src/runtime/tmp-file-mgr.cc               |  12 +-
 be/src/runtime/tmp-file-mgr.h                |   3 +-
 16 files changed, 653 insertions(+), 812 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index af01552..8ec76e0 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -30,6 +30,12 @@
 #include <gutil/strings/substitute.h>
 
 #include "codegen/llvm-codegen.h"
+#include "common/logging.h"
+#include "common/object-pool.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
+#include "runtime/descriptors.h"
+#include "runtime/disk-io-mgr-reader-context.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
@@ -324,7 +330,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
         partition_desc->partition_key_value_evals(), scan_node_pool_.get(), 
state);
   }
 
-  runtime_state_->io_mgr()->RegisterContext(&reader_context_, mem_tracker());
+  reader_context_ = runtime_state_->io_mgr()->RegisterContext(mem_tracker());
 
   // Initialize HdfsScanNode specific counters
   // TODO: Revisit counters and move the counters specific to multi-threaded 
scans
@@ -344,12 +350,13 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   num_scanner_threads_started_counter_ =
       ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
 
-  runtime_state_->io_mgr()->set_bytes_read_counter(reader_context_, 
bytes_read_counter());
-  runtime_state_->io_mgr()->set_read_timer(reader_context_, read_timer());
-  runtime_state_->io_mgr()->set_active_read_thread_counter(reader_context_,
-      &active_hdfs_read_thread_counter_);
-  runtime_state_->io_mgr()->set_disks_access_bitmap(reader_context_,
-      &disks_accessed_bitmap_);
+  runtime_state_->io_mgr()->set_bytes_read_counter(
+      reader_context_.get(), bytes_read_counter());
+  runtime_state_->io_mgr()->set_read_timer(reader_context_.get(), 
read_timer());
+  runtime_state_->io_mgr()->set_active_read_thread_counter(
+      reader_context_.get(), &active_hdfs_read_thread_counter_);
+  runtime_state_->io_mgr()->set_disks_access_bitmap(
+      reader_context_.get(), &disks_accessed_bitmap_);
 
   average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
       AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
@@ -393,14 +400,10 @@ Status HdfsScanNodeBase::Reset(RuntimeState* state) {
 void HdfsScanNodeBase::Close(RuntimeState* state) {
   if (is_closed()) return;
 
-  if (reader_context_ != NULL) {
-    // There may still be io buffers used by parent nodes so we can't 
unregister the
-    // reader context yet. The runtime state keeps a list of all the reader 
contexts and
-    // they are unregistered when the fragment is closed.
-    state->AcquireReaderContext(reader_context_);
+  if (reader_context_ != nullptr) {
     // Need to wait for all the active scanner threads to finish to ensure 
there is no
     // more memory tracked by this scan node's mem tracker.
-    state->io_mgr()->CancelContext(reader_context_, true);
+    state->io_mgr()->UnregisterContext(reader_context_.get());
   }
 
   StopAndFinalizeCounters();
@@ -512,9 +515,9 @@ DiskIoMgr::ScanRange* 
HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char*
       DiskIoMgr::BufferOpts(try_cache, mtime), original_split);
 }
 
-Status HdfsScanNodeBase::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& 
ranges,
-    int num_files_queued) {
-  RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_, 
ranges));
+Status HdfsScanNodeBase::AddDiskIoRanges(
+    const vector<DiskIoMgr::ScanRange*>& ranges, int num_files_queued) {
+  
RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), 
ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);
   return Status::OK();
@@ -808,20 +811,21 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
   runtime_profile()->AppendExecOption(
       Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
 
-  if (reader_context_ != NULL) {
-    
bytes_read_local_->Set(runtime_state_->io_mgr()->bytes_read_local(reader_context_));
+  if (reader_context_ != nullptr) {
+    bytes_read_local_->Set(
+        runtime_state_->io_mgr()->bytes_read_local(reader_context_.get()));
     bytes_read_short_circuit_->Set(
-        runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_));
+        
runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_.get()));
     bytes_read_dn_cache_->Set(
-        runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_));
+        runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_.get()));
     num_remote_ranges_->Set(static_cast<int64_t>(
-        runtime_state_->io_mgr()->num_remote_ranges(reader_context_)));
+        runtime_state_->io_mgr()->num_remote_ranges(reader_context_.get())));
     unexpected_remote_bytes_->Set(
-        runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_));
+        
runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_.get()));
     cached_file_handles_hit_count_->Set(
-        
runtime_state_->io_mgr()->cached_file_handles_hit_count(reader_context_));
+        
runtime_state_->io_mgr()->cached_file_handles_hit_count(reader_context_.get()));
     cached_file_handles_miss_count_->Set(
-        
runtime_state_->io_mgr()->cached_file_handles_miss_count(reader_context_));
+        
runtime_state_->io_mgr()->cached_file_handles_miss_count(reader_context_.get()));
 
     if (unexpected_remote_bytes_->value() >= 
UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
       runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index 6a0abde..e6b2154 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -154,7 +154,7 @@ class HdfsScanNodeBase : public ScanNode {
   const HdfsTableDescriptor* hdfs_table() const { return hdfs_table_; }
   const AvroSchemaElement& avro_schema() const { return *avro_schema_.get(); }
   int skip_header_line_count() const { return skip_header_line_count_; }
-  DiskIoRequestContext* reader_context() const { return reader_context_; }
+  DiskIoRequestContext* reader_context() const { return reader_context_.get(); 
}
   bool optimize_parquet_count_star() const { return 
optimize_parquet_count_star_; }
   int parquet_count_star_slot_offset() const { return 
parquet_count_star_slot_offset_; }
 
@@ -336,7 +336,7 @@ class HdfsScanNodeBase : public ScanNode {
   const int parquet_count_star_slot_offset_;
 
   /// RequestContext object to use with the disk-io-mgr for reads.
-  DiskIoRequestContext* reader_context_ = nullptr;
+  std::unique_ptr<DiskIoRequestContext> reader_context_;
 
   /// Descriptor for tuples this scan node constructs
   const TupleDescriptor* tuple_desc_ = nullptr;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 6803b69..7ea4d80 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -77,7 +77,7 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* e
       scanner_.reset();
     }
     RETURN_IF_ERROR(
-        runtime_state_->io_mgr()->GetNextRange(reader_context_, &scan_range_));
+        runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), 
&scan_range_));
     if (scan_range_ == NULL) {
       *eos = true;
       StopAndFinalizeCounters();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 8be191f..78f2ffa 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -254,7 +254,7 @@ void 
HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) {
 Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& 
ranges,
     int num_files_queued) {
   RETURN_IF_ERROR(
-      runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges));
+      runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);
   if (!ranges.empty()) ThreadTokenAvailableCb(runtime_state_->resource_pool());
@@ -428,7 +428,8 @@ void HdfsScanNode::ScannerThread() {
     // TODO: the Load() acts as an acquire barrier.  Is this needed? (i.e. any 
earlier
     // stores that need to complete?)
     AtomicUtil::MemoryBarrier();
-    Status status = runtime_state_->io_mgr()->GetNextRange(reader_context_, 
&scan_range);
+    Status status =
+        runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), 
&scan_range);
 
     if (status.ok() && scan_range != NULL) {
       // Got a scan range. Process the range end to end (in this thread).
@@ -548,8 +549,8 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
 void HdfsScanNode::SetDoneInternal() {
   if (done_) return;
   done_ = true;
-  if (reader_context_ != NULL) {
-    runtime_state_->io_mgr()->CancelContext(reader_context_);
+  if (reader_context_ != nullptr) {
+    runtime_state_->io_mgr()->CancelContext(reader_context_.get());
   }
   materialized_row_batches_->Shutdown();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h 
b/be/src/runtime/disk-io-mgr-internal.h
index 138f3f0..cc50af7 100644
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ b/be/src/runtime/disk-io-mgr-internal.h
@@ -18,13 +18,13 @@
 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
 #define IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
 
-#include "disk-io-mgr.h"
+#include <unistd.h>
 #include <queue>
 #include <boost/thread/locks.hpp>
-#include <unistd.h>
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
+#include "runtime/disk-io-mgr-reader-context.h"
 #include "runtime/disk-io-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/thread-resource-mgr.h"
@@ -32,12 +32,12 @@
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
-#include "util/hdfs-util.h"
 #include "util/filesystem-util.h"
+#include "util/hdfs-util.h"
 #include "util/impalad-metrics.h"
 
-/// This file contains internal structures to the IoMgr. Users of the IoMgr do
-/// not need to include this file.
+/// This file contains internal structures shared between submodules of the 
IoMgr. Users
+/// of the IoMgr do not need to include this file.
 namespace impala {
 
 /// Per disk state
@@ -69,405 +69,8 @@ struct DiskIoMgr::DiskQueue {
     work_available.NotifyAll();
   }
 
-  DiskQueue(int id) : disk_id(id) { }
+  DiskQueue(int id) : disk_id(id) {}
 };
-
-/// Internal per request-context state. This object maintains a lot of state 
that is
-/// carefully synchronized. The context maintains state across all disks as 
well as
-/// per disk state.
-/// The unit for an IO request is a RequestRange, which may be a ScanRange or a
-/// WriteRange.
-/// A scan range for the reader is on one of five states:
-/// 1) PerDiskState's unstarted_ranges: This range has only been queued
-///    and nothing has been read from it.
-/// 2) DiskIoRequestContext's ready_to_start_ranges_: This range is about to 
be started.
-///    As soon as the reader picks it up, it will move to the in_flight_ranges
-///    queue.
-/// 3) PerDiskState's in_flight_ranges: This range is being processed and will
-///    be read from the next time a disk thread picks it up in 
GetNextRequestRange()
-/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range
-///    anymore. We need the caller to pull a buffer off which will put this in
-///    the in_flight_ranges queue. These ranges are in the 
DiskIoRequestContext's
-///    blocked_ranges_ queue.
-/// 5) ScanRange is cached and in the cached_ranges_ queue.
-//
-/// If the scan range is read and does not get blocked on the outgoing queue, 
the
-/// transitions are: 1 -> 2 -> 3.
-/// If the scan range does get blocked, the transitions are
-/// 1 -> 2 -> 3 -> (4 -> 3)*
-//
-/// In the case of a cached scan range, the range is immediately put in 
cached_ranges_.
-/// When the caller asks for the next range to process, we first pull ranges 
from
-/// the cache_ranges_ queue. If the range was cached, the range is removed and
-/// done (ranges are either entirely cached or not at all). If the cached read 
attempt
-/// fails, we put the range in state 1.
-//
-/// A write range for a context may be in one of two lists:
-/// 1) unstarted_write_ranges_ : Ranges that have been queued but not 
processed.
-/// 2) in_flight_ranges_: The write range is ready to be processed by the next 
disk thread
-///    that picks it up in GetNextRequestRange().
-//
-/// AddWriteRange() adds WriteRanges for a disk.
-/// It is the responsibility of the client to pin the data to be written via a 
WriteRange
-/// in memory. After a WriteRange has been written, a callback is invoked to 
inform the
-/// client that the write has completed.
-//
-/// An important assumption is that write does not exceed the maximum read 
size and that
-/// the entire range is written when the write request is handled. (In other 
words, writes
-/// are not broken up.)
-//
-/// When a DiskIoRequestContext is processed by a disk thread in 
GetNextRequestRange(),
-/// a write range is always removed from the list of unstarted write ranges 
and appended
-/// to the in_flight_ranges_ queue. This is done to alternate reads and writes 
- a read
-/// that is scheduled (by calling GetNextRange()) is always followed by a 
write (if one
-/// exists).  And since at most one WriteRange can be present in 
in_flight_ranges_ at any
-/// time (once a write range is returned from GetNetxRequestRange() it is 
completed an
-/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can 
be queued up
-/// behind at most one write range.
-class DiskIoRequestContext {
-  using DiskQueue = DiskIoMgr::DiskQueue;
-  using RequestRange = DiskIoMgr::RequestRange;
-  using ScanRange = DiskIoMgr::ScanRange;
-  using WriteRange = DiskIoMgr::WriteRange;
-  using RequestType = DiskIoMgr::RequestType;
-
- public:
-  enum State {
-    /// Reader is initialized and maps to a client
-    Active,
-
-    /// Reader is in the process of being cancelled.  Cancellation is 
coordinated between
-    /// different threads and when they are all complete, the reader context 
is moved to
-    /// the inactive state.
-    Cancelled,
-
-    /// Reader context does not map to a client.  Accessing memory in this 
context
-    /// is invalid (i.e. it is equivalent to a dangling pointer).
-    Inactive,
-  };
-
-  DiskIoRequestContext(DiskIoMgr* parent, int num_disks);
-
-  /// Resets this object.
-  void Reset(MemTracker* tracker);
-
-  /// Decrements the number of active disks for this reader.  If the disk count
-  /// goes to 0, the disk complete condition variable is signaled.
-  /// Reader lock must be taken before this call.
-  void DecrementDiskRefCount() {
-    // boost doesn't let us dcheck that the reader lock is taken
-    DCHECK_GT(num_disks_with_ranges_, 0);
-    if (--num_disks_with_ranges_ == 0) {
-      disks_complete_cond_var_.NotifyAll();
-    }
-    DCHECK(Validate()) << std::endl << DebugString();
-  }
-
-  /// Reader & Disk Scheduling: Readers that currently can't do work are not on
-  /// the disk's queue. These readers are ones that don't have any ranges in 
the
-  /// in_flight_queue AND have not prepared a range by setting 
next_range_to_start.
-  /// The rule to make sure readers are scheduled correctly is to ensure 
anytime a
-  /// range is put on the in_flight_queue or anytime next_range_to_start is 
set to
-  /// NULL, the reader is scheduled.
-
-  /// Adds range to in_flight_ranges, scheduling this reader on the disk 
threads
-  /// if necessary.
-  /// Reader lock must be taken before this.
-  void ScheduleScanRange(ScanRange* range) {
-    DCHECK_EQ(state_, Active);
-    DCHECK(range != NULL);
-    DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
-    state.in_flight_ranges()->Enqueue(range);
-    state.ScheduleContext(this, range->disk_id());
-  }
-
-  /// Cancels the context with status code 'status'
-  void Cancel(const Status& status);
-
-  /// Adds request range to disk queue for this request context. Currently,
-  /// schedule_immediately must be false is RequestRange is a write range.
-  void AddRequestRange(RequestRange* range, bool schedule_immediately);
-
-  /// Validates invariants of reader.  Reader lock must be taken beforehand.
-  bool Validate() const;
-
-  /// Dumps out reader information.  Lock should be taken by caller
-  std::string DebugString() const;
-
- private:
-  friend class DiskIoMgr;
-  class PerDiskState;
-
-   /// Parent object
-  DiskIoMgr* parent_;
-
-  /// Memory used for this reader.  This is unowned by this object.
-  MemTracker* mem_tracker_;
-
-  /// Total bytes read for this reader
-  RuntimeProfile::Counter* bytes_read_counter_;
-
-  /// Total time spent in hdfs reading
-  RuntimeProfile::Counter* read_timer_;
-
-  /// Number of active read threads
-  RuntimeProfile::Counter* active_read_thread_counter_;
-
-  /// Disk access bitmap. The counter's bit[i] is set if disk id i has been 
accessed.
-  /// TODO: we can only support up to 64 disks with this bitmap but it lets us 
use a
-  /// builtin atomic instruction. Probably good enough for now.
-  RuntimeProfile::Counter* disks_accessed_bitmap_;
-
-  /// Total number of bytes read locally, updated at end of each range scan
-  AtomicInt64 bytes_read_local_;
-
-  /// Total number of bytes read via short circuit read, updated at end of 
each range scan
-  AtomicInt64 bytes_read_short_circuit_;
-
-  /// Total number of bytes read from date node cache, updated at end of each 
range scan
-  AtomicInt64 bytes_read_dn_cache_;
-
-  /// Total number of bytes from remote reads that were expected to be local.
-  AtomicInt64 unexpected_remote_bytes_;
-
-  /// The number of buffers that have been returned to the reader (via 
GetNext) that the
-  /// reader has not returned. Only included for debugging and diagnostics.
-  AtomicInt32 num_buffers_in_reader_;
-
-  /// The number of scan ranges that have been completed for this reader.
-  AtomicInt32 num_finished_ranges_;
-
-  /// The number of scan ranges that required a remote read, updated at the 
end of each
-  /// range scan. Only used for diagnostics.
-  AtomicInt32 num_remote_ranges_;
-
-  /// The total number of scan ranges that have not been started. Only used for
-  /// diagnostics. This is the sum of all unstarted_scan_ranges across all 
disks.
-  AtomicInt32 num_unstarted_scan_ranges_;
-
-  /// Total number of file handle opens where the file handle was present in 
the cache
-  AtomicInt32 cached_file_handles_hit_count_;
-
-  /// Total number of file handle opens where the file handle was not in the 
cache
-  AtomicInt32 cached_file_handles_miss_count_;
-
-  /// The number of buffers that are being used for this reader. This is the 
sum
-  /// of all buffers in ScanRange queues and buffers currently being read into 
(i.e. about
-  /// to be queued). This includes both IOMgr-allocated buffers and 
client-provided
-  /// buffers.
-  AtomicInt32 num_used_buffers_;
-
-  /// The total number of ready buffers across all ranges.  Ready buffers are 
buffers
-  /// that have been read from disk but not retrieved by the caller.
-  /// This is the sum of all queued buffers in all ranges for this reader 
context.
-  AtomicInt32 num_ready_buffers_;
-
-  /// All fields below are accessed by multiple threads and the lock needs to 
be
-  /// taken before accessing them. Must be acquired before ScanRange::lock_ if 
both
-  /// are held simultaneously.
-  boost::mutex lock_;
-
-  /// Current state of the reader
-  State state_;
-
-  /// Status of this reader.  Set to non-ok if cancelled.
-  Status status_;
-
-  /// The number of disks with scan ranges remaining (always equal to the sum 
of
-  /// disks with ranges).
-  int num_disks_with_ranges_;
-
-  /// This is the list of ranges that are expected to be cached on the DN.
-  /// When the reader asks for a new range (GetNextScanRange()), we first
-  /// return ranges from this list.
-  InternalQueue<ScanRange> cached_ranges_;
-
-  /// A list of ranges that should be returned in subsequent calls to
-  /// GetNextRange.
-  /// There is a trade-off with when to populate this list.  Populating it on
-  /// demand means consumers need to wait (happens in 
DiskIoMgr::GetNextRange()).
-  /// Populating it preemptively means we make worse scheduling decisions.
-  /// We currently populate one range per disk.
-  /// TODO: think about this some more.
-  InternalQueue<ScanRange> ready_to_start_ranges_;
-  ConditionVariable ready_to_start_ranges_cv_;  // used with lock_
-
-  /// Ranges that are blocked due to back pressure on outgoing buffers.
-  InternalQueue<ScanRange> blocked_ranges_;
-
-  /// Condition variable for UnregisterContext() to wait for all disks to 
complete
-  ConditionVariable disks_complete_cond_var_;
-
-  /// Struct containing state per disk. See comments in the disk read loop on 
how
-  /// they are used.
-  class PerDiskState {
-   public:
-    bool done() const { return done_; }
-    void set_done(bool b) { done_ = b; }
-
-    int num_remaining_ranges() const { return num_remaining_ranges_; }
-    int& num_remaining_ranges() { return num_remaining_ranges_; }
-
-    ScanRange* next_scan_range_to_start() { return next_scan_range_to_start_; }
-    void set_next_scan_range_to_start(ScanRange* range) {
-      next_scan_range_to_start_ = range;
-    }
-
-    /// We need to have a memory barrier to prevent this load from being 
reordered
-    /// with num_threads_in_op(), since these variables are set without the 
reader
-    /// lock taken
-    bool is_on_queue() const {
-      bool b = is_on_queue_;
-      __sync_synchronize();
-      return b;
-    }
-
-    int num_threads_in_op() const {
-      int v = num_threads_in_op_.Load();
-      // TODO: determine whether this barrier is necessary for any callsites.
-      AtomicUtil::MemoryBarrier();
-      return v;
-    }
-
-    const InternalQueue<ScanRange>* unstarted_scan_ranges() const {
-      return &unstarted_scan_ranges_;
-    }
-    const InternalQueue<WriteRange>* unstarted_write_ranges() const {
-      return &unstarted_write_ranges_;
-    }
-    const InternalQueue<RequestRange>* in_flight_ranges() const {
-      return &in_flight_ranges_;
-    }
-
-    InternalQueue<ScanRange>* unstarted_scan_ranges() {
-      return &unstarted_scan_ranges_;
-    }
-    InternalQueue<WriteRange>* unstarted_write_ranges() {
-      return &unstarted_write_ranges_;
-    }
-    InternalQueue<RequestRange>* in_flight_ranges() {
-      return &in_flight_ranges_;
-    }
-
-    PerDiskState() {
-      Reset();
-    }
-
-    /// Schedules the request context on this disk if it's not already on the 
queue.
-    /// Context lock must be taken before this.
-    void ScheduleContext(DiskIoRequestContext* context, int disk_id) {
-      if (!is_on_queue_ && !done_) {
-        is_on_queue_ = true;
-        context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
-      }
-    }
-
-    /// Increment the ref count on reader.  We need to track the number of 
threads per
-    /// reader per disk that are in the unlocked hdfs read code section. This 
is updated
-    /// by multiple threads without a lock so we need to use an atomic int.
-    void IncrementRequestThreadAndDequeue() {
-      num_threads_in_op_.Add(1);
-      is_on_queue_ = false;
-    }
-
-    void DecrementRequestThread() {
-      num_threads_in_op_.Add(-1);
-    }
-
-    /// Decrement request thread count and do final cleanup if this is the last
-    /// thread. RequestContext lock must be taken before this.
-    void DecrementRequestThreadAndCheckDone(DiskIoRequestContext* context) {
-      num_threads_in_op_.Add(-1); // Also acts as a barrier.
-      if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) {
-        // This thread is the last one for this reader on this disk, do final 
cleanup
-        context->DecrementDiskRefCount();
-        done_ = true;
-      }
-    }
-
-    void Reset() {
-      DCHECK(in_flight_ranges_.empty());
-      DCHECK(unstarted_scan_ranges_.empty());
-      DCHECK(unstarted_write_ranges_.empty());
-
-      done_ = true;
-      num_remaining_ranges_ = 0;
-      is_on_queue_ = false;
-      num_threads_in_op_.Store(0);
-      next_scan_range_to_start_ = NULL;
-    }
-
-   private:
-    /// If true, this disk is all done for this request context, including any 
cleanup.
-    /// If done is true, it means that this request must not be on this disk's 
queue
-    /// *AND* there are no threads currently working on this context. To 
satisfy
-    /// this, only the last thread (per disk) can set this to true.
-    bool done_;
-
-    /// For each disk, keeps track if the context is on this disk's queue, 
indicating
-    /// the disk must do some work for this context. The disk needs to do work 
in 4 cases:
-    ///  1) in_flight_ranges is not empty, the disk needs to read for this 
reader.
-    ///  2) next_range_to_start is NULL, the disk needs to prepare a scan 
range to be
-    ///     read next.
-    ///  3) the reader has been cancelled and this disk needs to participate 
in the
-    ///     cleanup.
-    ///  4) A write range is added to queue.
-    /// In general, we only want to put a context on the disk queue if there 
is something
-    /// useful that can be done. If there's nothing useful, the disk queue 
will wake up
-    /// and then remove the reader from the queue. Doing this causes thrashing 
of the
-    /// threads.
-    bool is_on_queue_;
-
-    /// For each disks, the number of request ranges that have not been fully 
read.
-    /// In the non-cancellation path, this will hit 0, and done will be set to 
true
-    /// by the disk thread. This is undefined in the cancellation path (the 
various
-    /// threads notice by looking at the DiskIoRequestContext's state_).
-    int num_remaining_ranges_;
-
-    /// Queue of ranges that have not started being read.  This list is 
exclusive
-    /// with in_flight_ranges.
-    InternalQueue<ScanRange> unstarted_scan_ranges_;
-
-    /// Queue of pending IO requests for this disk in the order that they will 
be
-    /// processed. A ScanRange is added to this queue when it is returned in
-    /// GetNextRange(), or when it is added with schedule_immediately = true.
-    /// A WriteRange is added to this queue from unstarted_write_ranges_ for 
each
-    /// invocation of GetNextRequestRange() in WorkLoop().
-    /// The size of this queue is always less than or equal to 
num_remaining_ranges.
-    InternalQueue<RequestRange> in_flight_ranges_;
-
-    /// The next range to start for this reader on this disk. Each disk (for 
each reader)
-    /// picks the next range to start. The range is set here and also added to 
the
-    /// ready_to_start_ranges_ queue. The reader pulls from the queue in FIFO 
order,
-    /// so the ranges from different disks are round-robined. When the range 
is pulled
-    /// off the ready_to_start_ranges_ queue, it sets this variable to NULL, 
so the disk
-    /// knows to populate it again and add it to ready_to_start_ranges_ i.e. 
it is used
-    /// as a flag by DiskIoMgr::GetNextScanRange to determine if it needs to 
add another
-    /// range to ready_to_start_ranges_.
-    ScanRange* next_scan_range_to_start_;
-
-    /// For each disk, the number of threads issuing the underlying read/write 
on behalf
-    /// of this context. There are a few places where we release the context 
lock, do some
-    /// work, and then grab the lock again.  Because we don't hold the lock 
for the
-    /// entire operation, we need this ref count to keep track of which thread 
should do
-    /// final resource cleanup during cancellation.
-    /// Only the thread that sees the count at 0 should do the final cleanup.
-    AtomicInt32 num_threads_in_op_;
-
-    /// Queue of write ranges to process for this disk. A write range is 
always added
-    /// to in_flight_ranges_ in GetNextRequestRange(). There is a separate
-    /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate 
between reads
-    /// and writes. (Otherwise, since next_scan_range_to_start is set
-    /// in GetNextRequestRange() whenever it is null, repeated calls to
-    /// GetNextRequestRange() and GetNextRange() may result in only reads 
being processed)
-    InternalQueue<WriteRange> unstarted_write_ranges_;
-  };
-
-  /// Per disk states to synchronize multiple disk threads accessing the same 
request
-  /// context.
-  std::vector<PerDiskState> disk_states_;
-};
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc 
b/be/src/runtime/disk-io-mgr-reader-context.cc
index afe2b23..d62545b 100644
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ b/be/src/runtime/disk-io-mgr-reader-context.cc
@@ -91,6 +91,23 @@ void DiskIoRequestContext::Cancel(const Status& status) {
   ready_to_start_ranges_cv_.NotifyAll();
 }
 
+void DiskIoRequestContext::CancelAndMarkInactive() {
+  Cancel(Status::CANCELLED);
+
+  boost::unique_lock<boost::mutex> l(lock_);
+  DCHECK_NE(state_, Inactive);
+  DCHECK(Validate()) << endl << DebugString();
+
+  // Wait until the ranges finish up.
+  while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l);
+
+  // Validate that no buffers were leaked from this context.
+  DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString();
+  DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString();
+  DCHECK(Validate()) << endl << DebugString();
+  state_ = Inactive;
+}
+
 void DiskIoRequestContext::AddRequestRange(
     DiskIoMgr::RequestRange* range, bool schedule_immediately) {
   // DCHECK(lock_.is_locked()); // TODO: boost should have this API
@@ -129,51 +146,9 @@ void DiskIoRequestContext::AddRequestRange(
   ++state.num_remaining_ranges();
 }
 
-DiskIoRequestContext::DiskIoRequestContext(DiskIoMgr* parent, int num_disks)
-  : parent_(parent),
-    bytes_read_counter_(NULL),
-    read_timer_(NULL),
-    active_read_thread_counter_(NULL),
-    disks_accessed_bitmap_(NULL),
-    state_(Inactive),
-    disk_states_(num_disks) {
-}
-
-// Resets this object.
-void DiskIoRequestContext::Reset(MemTracker* tracker) {
-  DCHECK_EQ(state_, Inactive);
-  status_ = Status::OK();
-
-  bytes_read_counter_ = NULL;
-  read_timer_ = NULL;
-  active_read_thread_counter_ = NULL;
-  disks_accessed_bitmap_ = NULL;
-
-  state_ = Active;
-  mem_tracker_ = tracker;
-
-  num_unstarted_scan_ranges_.Store(0);
-  num_disks_with_ranges_ = 0;
-  num_used_buffers_.Store(0);
-  num_buffers_in_reader_.Store(0);
-  num_ready_buffers_.Store(0);
-  num_finished_ranges_.Store(0);
-  num_remote_ranges_.Store(0);
-  bytes_read_local_.Store(0);
-  bytes_read_short_circuit_.Store(0);
-  bytes_read_dn_cache_.Store(0);
-  unexpected_remote_bytes_.Store(0);
-  cached_file_handles_hit_count_.Store(0);
-  cached_file_handles_miss_count_.Store(0);
-
-  DCHECK(ready_to_start_ranges_.empty());
-  DCHECK(blocked_ranges_.empty());
-  DCHECK(cached_ranges_.empty());
-
-  for (int i = 0; i < disk_states_.size(); ++i) {
-    disk_states_[i].Reset();
-  }
-}
+DiskIoRequestContext::DiskIoRequestContext(
+    DiskIoMgr* parent, int num_disks, MemTracker* tracker)
+  : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {}
 
 // Dumps out request context information. Lock should be taken by caller
 string DiskIoRequestContext::DebugString() const {
@@ -307,3 +282,11 @@ bool DiskIoRequestContext::Validate() const {
 
   return true;
 }
+
+void DiskIoRequestContext::PerDiskState::ScheduleContext(
+    DiskIoRequestContext* context, int disk_id) {
+  if (!is_on_queue_ && !done_) {
+    is_on_queue_ = true;
+    context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/disk-io-mgr-reader-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.h 
b/be/src/runtime/disk-io-mgr-reader-context.h
new file mode 100644
index 0000000..90426d9
--- /dev/null
+++ b/be/src/runtime/disk-io-mgr-reader-context.h
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_DISK_IO_MGR_READER_CONTEXT_H
+#define IMPALA_RUNTIME_DISK_IO_MGR_READER_CONTEXT_H
+
+#include "runtime/disk-io-mgr.h"
+#include "util/condition-variable.h"
+
+namespace impala {
+
+/// A request context is used to group together I/O requests belonging to a 
client of the
+/// I/O manager for management and scheduling. For most I/O manager clients it 
is an
+/// opaque pointer, but some clients may need to include this header, e.g. to 
make the
+/// unique_ptr<DiskIoRequestContext> destructor work correctly.
+///
+/// Implementation Details
+/// ======================
+/// This object maintains a lot of state that is carefully synchronized. The 
context
+/// maintains state across all disks as well as per disk state.
+/// The unit for an IO request is a RequestRange, which may be a ScanRange or a
+/// WriteRange.
+/// A scan range for the reader is on one of five states:
+/// 1) PerDiskState's unstarted_ranges: This range has only been queued
+///    and nothing has been read from it.
+/// 2) DiskIoRequestContext's ready_to_start_ranges_: This range is about to 
be started.
+///    As soon as the reader picks it up, it will move to the in_flight_ranges
+///    queue.
+/// 3) PerDiskState's in_flight_ranges: This range is being processed and will
+///    be read from the next time a disk thread picks it up in 
GetNextRequestRange()
+/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range
+///    anymore. We need the caller to pull a buffer off which will put this in
+///    the in_flight_ranges queue. These ranges are in the 
DiskIoRequestContext's
+///    blocked_ranges_ queue.
+/// 5) ScanRange is cached and in the cached_ranges_ queue.
+//
+/// If the scan range is read and does not get blocked on the outgoing queue, 
the
+/// transitions are: 1 -> 2 -> 3.
+/// If the scan range does get blocked, the transitions are
+/// 1 -> 2 -> 3 -> (4 -> 3)*
+//
+/// In the case of a cached scan range, the range is immediately put in 
cached_ranges_.
+/// When the caller asks for the next range to process, we first pull ranges 
from
+/// the cache_ranges_ queue. If the range was cached, the range is removed and
+/// done (ranges are either entirely cached or not at all). If the cached read 
attempt
+/// fails, we put the range in state 1.
+//
+/// A write range for a context may be in one of two lists:
+/// 1) unstarted_write_ranges_ : Ranges that have been queued but not 
processed.
+/// 2) in_flight_ranges_: The write range is ready to be processed by the next 
disk thread
+///    that picks it up in GetNextRequestRange().
+//
+/// AddWriteRange() adds WriteRanges for a disk.
+/// It is the responsibility of the client to pin the data to be written via a 
WriteRange
+/// in memory. After a WriteRange has been written, a callback is invoked to 
inform the
+/// client that the write has completed.
+//
+/// An important assumption is that write does not exceed the maximum read 
size and that
+/// the entire range is written when the write request is handled. (In other 
words, writes
+/// are not broken up.)
+//
+/// When a DiskIoRequestContext is processed by a disk thread in 
GetNextRequestRange(),
+/// a write range is always removed from the list of unstarted write ranges 
and appended
+/// to the in_flight_ranges_ queue. This is done to alternate reads and writes 
- a read
+/// that is scheduled (by calling GetNextRange()) is always followed by a 
write (if one
+/// exists).  And since at most one WriteRange can be present in 
in_flight_ranges_ at any
+/// time (once a write range is returned from GetNetxRequestRange() it is 
completed an
+/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can 
be queued up
+/// behind at most one write range.
+class DiskIoRequestContext {
+  using RequestRange = DiskIoMgr::RequestRange;
+  using ScanRange = DiskIoMgr::ScanRange;
+  using WriteRange = DiskIoMgr::WriteRange;
+  using RequestType = DiskIoMgr::RequestType;
+
+ public:
+  ~DiskIoRequestContext() { DCHECK_EQ(state_, Inactive) << "Must be 
unregistered."; }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(DiskIoRequestContext);
+  friend class DiskIoMgr;
+
+  class PerDiskState;
+
+  enum State {
+    /// Reader is initialized and maps to a client
+    Active,
+
+    /// Reader is in the process of being cancelled.  Cancellation is 
coordinated between
+    /// different threads and when they are all complete, the reader context 
is moved to
+    /// the inactive state.
+    Cancelled,
+
+    /// Reader context does not map to a client.  Accessing memory in this 
context
+    /// is invalid (i.e. it is equivalent to a dangling pointer).
+    Inactive,
+  };
+
+  DiskIoRequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
+
+  /// Decrements the number of active disks for this reader.  If the disk count
+  /// goes to 0, the disk complete condition variable is signaled.
+  /// Reader lock must be taken before this call.
+  void DecrementDiskRefCount() {
+    // boost doesn't let us dcheck that the reader lock is taken
+    DCHECK_GT(num_disks_with_ranges_, 0);
+    if (--num_disks_with_ranges_ == 0) {
+      disks_complete_cond_var_.NotifyAll();
+    }
+    DCHECK(Validate()) << std::endl << DebugString();
+  }
+
+  /// Reader & Disk Scheduling: Readers that currently can't do work are not on
+  /// the disk's queue. These readers are ones that don't have any ranges in 
the
+  /// in_flight_queue AND have not prepared a range by setting 
next_range_to_start.
+  /// The rule to make sure readers are scheduled correctly is to ensure 
anytime a
+  /// range is put on the in_flight_queue or anytime next_range_to_start is 
set to
+  /// NULL, the reader is scheduled.
+
+  /// Adds range to in_flight_ranges, scheduling this reader on the disk 
threads
+  /// if necessary.
+  /// Reader lock must be taken before this.
+  void ScheduleScanRange(ScanRange* range) {
+    DCHECK_EQ(state_, Active);
+    DCHECK(range != NULL);
+    DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+    state.in_flight_ranges()->Enqueue(range);
+    state.ScheduleContext(this, range->disk_id());
+  }
+
+  /// Cancels the context with status code 'status'
+  void Cancel(const Status& status);
+
+  /// Cancel the context if not already cancelled, wait for all scan ranges to 
finish
+  /// and mark the context as inactive, after which it cannot be used.
+  void CancelAndMarkInactive();
+
+  /// Adds request range to disk queue for this request context. Currently,
+  /// schedule_immediately must be false is RequestRange is a write range.
+  void AddRequestRange(RequestRange* range, bool schedule_immediately);
+
+  /// Validates invariants of reader.  Reader lock must be taken beforehand.
+  bool Validate() const;
+
+  /// Dumps out reader information.  Lock should be taken by caller
+  std::string DebugString() const;
+
+  /// Parent object
+  DiskIoMgr* const parent_;
+
+  /// Memory used for this reader.  This is unowned by this object.
+  MemTracker* const mem_tracker_;
+
+  /// Total bytes read for this reader
+  RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
+
+  /// Total time spent in hdfs reading
+  RuntimeProfile::Counter* read_timer_ = nullptr;
+
+  /// Number of active read threads
+  RuntimeProfile::Counter* active_read_thread_counter_ = nullptr;
+
+  /// Disk access bitmap. The counter's bit[i] is set if disk id i has been 
accessed.
+  /// TODO: we can only support up to 64 disks with this bitmap but it lets us 
use a
+  /// builtin atomic instruction. Probably good enough for now.
+  RuntimeProfile::Counter* disks_accessed_bitmap_ = nullptr;
+
+  /// Total number of bytes read locally, updated at end of each range scan
+  AtomicInt64 bytes_read_local_{0};
+
+  /// Total number of bytes read via short circuit read, updated at end of 
each range scan
+  AtomicInt64 bytes_read_short_circuit_{0};
+
+  /// Total number of bytes read from date node cache, updated at end of each 
range scan
+  AtomicInt64 bytes_read_dn_cache_{0};
+
+  /// Total number of bytes from remote reads that were expected to be local.
+  AtomicInt64 unexpected_remote_bytes_{0};
+
+  /// The number of buffers that have been returned to the reader (via 
GetNext) that the
+  /// reader has not returned. Only included for debugging and diagnostics.
+  AtomicInt32 num_buffers_in_reader_{0};
+
+  /// The number of scan ranges that have been completed for this reader.
+  AtomicInt32 num_finished_ranges_{0};
+
+  /// The number of scan ranges that required a remote read, updated at the 
end of each
+  /// range scan. Only used for diagnostics.
+  AtomicInt32 num_remote_ranges_{0};
+
+  /// The total number of scan ranges that have not been started. Only used for
+  /// diagnostics. This is the sum of all unstarted_scan_ranges across all 
disks.
+  AtomicInt32 num_unstarted_scan_ranges_{0};
+
+  /// Total number of file handle opens where the file handle was present in 
the cache
+  AtomicInt32 cached_file_handles_hit_count_{0};
+
+  /// Total number of file handle opens where the file handle was not in the 
cache
+  AtomicInt32 cached_file_handles_miss_count_{0};
+
+  /// The number of buffers that are being used for this reader. This is the 
sum
+  /// of all buffers in ScanRange queues and buffers currently being read into 
(i.e. about
+  /// to be queued). This includes both IOMgr-allocated buffers and 
client-provided
+  /// buffers.
+  AtomicInt32 num_used_buffers_{0};
+
+  /// The total number of ready buffers across all ranges.  Ready buffers are 
buffers
+  /// that have been read from disk but not retrieved by the caller.
+  /// This is the sum of all queued buffers in all ranges for this reader 
context.
+  AtomicInt32 num_ready_buffers_{0};
+
+  /// All fields below are accessed by multiple threads and the lock needs to 
be
+  /// taken before accessing them. Must be acquired before ScanRange::lock_ if 
both
+  /// are held simultaneously.
+  boost::mutex lock_;
+
+  /// Current state of the reader
+  State state_ = Active;
+
+  /// Status of this reader.  Set to non-ok if cancelled.
+  Status status_;
+
+  /// The number of disks with scan ranges remaining (always equal to the sum 
of
+  /// disks with ranges).
+  int num_disks_with_ranges_ = 0;
+
+  /// This is the list of ranges that are expected to be cached on the DN.
+  /// When the reader asks for a new range (GetNextScanRange()), we first
+  /// return ranges from this list.
+  InternalQueue<ScanRange> cached_ranges_;
+
+  /// A list of ranges that should be returned in subsequent calls to
+  /// GetNextRange.
+  /// There is a trade-off with when to populate this list.  Populating it on
+  /// demand means consumers need to wait (happens in 
DiskIoMgr::GetNextRange()).
+  /// Populating it preemptively means we make worse scheduling decisions.
+  /// We currently populate one range per disk.
+  /// TODO: think about this some more.
+  InternalQueue<ScanRange> ready_to_start_ranges_;
+  ConditionVariable ready_to_start_ranges_cv_; // used with lock_
+
+  /// Ranges that are blocked due to back pressure on outgoing buffers.
+  InternalQueue<ScanRange> blocked_ranges_;
+
+  /// Condition variable for UnregisterContext() to wait for all disks to 
complete
+  ConditionVariable disks_complete_cond_var_;
+
+  /// Struct containing state per disk. See comments in the disk read loop on 
how
+  /// they are used.
+  class PerDiskState {
+   public:
+    bool done() const { return done_; }
+    void set_done(bool b) { done_ = b; }
+
+    int num_remaining_ranges() const { return num_remaining_ranges_; }
+    int& num_remaining_ranges() { return num_remaining_ranges_; }
+
+    ScanRange* next_scan_range_to_start() { return next_scan_range_to_start_; }
+    void set_next_scan_range_to_start(ScanRange* range) {
+      next_scan_range_to_start_ = range;
+    }
+
+    /// We need to have a memory barrier to prevent this load from being 
reordered
+    /// with num_threads_in_op(), since these variables are set without the 
reader
+    /// lock taken
+    bool is_on_queue() const {
+      bool b = is_on_queue_;
+      __sync_synchronize();
+      return b;
+    }
+
+    int num_threads_in_op() const {
+      int v = num_threads_in_op_.Load();
+      // TODO: determine whether this barrier is necessary for any callsites.
+      AtomicUtil::MemoryBarrier();
+      return v;
+    }
+
+    const InternalQueue<ScanRange>* unstarted_scan_ranges() const {
+      return &unstarted_scan_ranges_;
+    }
+    const InternalQueue<WriteRange>* unstarted_write_ranges() const {
+      return &unstarted_write_ranges_;
+    }
+    const InternalQueue<RequestRange>* in_flight_ranges() const {
+      return &in_flight_ranges_;
+    }
+
+    InternalQueue<ScanRange>* unstarted_scan_ranges() { return 
&unstarted_scan_ranges_; }
+    InternalQueue<WriteRange>* unstarted_write_ranges() {
+      return &unstarted_write_ranges_;
+    }
+    InternalQueue<RequestRange>* in_flight_ranges() { return 
&in_flight_ranges_; }
+
+    /// Schedules the request context on this disk if it's not already on the 
queue.
+    /// Context lock must be taken before this.
+    void ScheduleContext(DiskIoRequestContext* context, int disk_id);
+
+    /// Increment the ref count on reader.  We need to track the number of 
threads per
+    /// reader per disk that are in the unlocked hdfs read code section. This 
is updated
+    /// by multiple threads without a lock so we need to use an atomic int.
+    void IncrementRequestThreadAndDequeue() {
+      num_threads_in_op_.Add(1);
+      is_on_queue_ = false;
+    }
+
+    void DecrementRequestThread() { num_threads_in_op_.Add(-1); }
+
+    /// Decrement request thread count and do final cleanup if this is the last
+    /// thread. RequestContext lock must be taken before this.
+    void DecrementRequestThreadAndCheckDone(DiskIoRequestContext* context) {
+      num_threads_in_op_.Add(-1); // Also acts as a barrier.
+      if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) {
+        // This thread is the last one for this reader on this disk, do final 
cleanup
+        context->DecrementDiskRefCount();
+        done_ = true;
+      }
+    }
+
+   private:
+    /// If true, this disk is all done for this request context, including any 
cleanup.
+    /// If done is true, it means that this request must not be on this disk's 
queue
+    /// *AND* there are no threads currently working on this context. To 
satisfy
+    /// this, only the last thread (per disk) can set this to true.
+    bool done_ = true;
+
+    /// For each disk, keeps track if the context is on this disk's queue, 
indicating
+    /// the disk must do some work for this context. The disk needs to do work 
in 4 cases:
+    ///  1) in_flight_ranges is not empty, the disk needs to read for this 
reader.
+    ///  2) next_range_to_start is NULL, the disk needs to prepare a scan 
range to be
+    ///     read next.
+    ///  3) the reader has been cancelled and this disk needs to participate 
in the
+    ///     cleanup.
+    ///  4) A write range is added to queue.
+    /// In general, we only want to put a context on the disk queue if there 
is something
+    /// useful that can be done. If there's nothing useful, the disk queue 
will wake up
+    /// and then remove the reader from the queue. Doing this causes thrashing 
of the
+    /// threads.
+    bool is_on_queue_ = false;
+
+    /// For each disks, the number of request ranges that have not been fully 
read.
+    /// In the non-cancellation path, this will hit 0, and done will be set to 
true
+    /// by the disk thread. This is undefined in the cancellation path (the 
various
+    /// threads notice by looking at the DiskIoRequestContext's state_).
+    int num_remaining_ranges_ = 0;
+
+    /// Queue of ranges that have not started being read.  This list is 
exclusive
+    /// with in_flight_ranges.
+    InternalQueue<ScanRange> unstarted_scan_ranges_;
+
+    /// Queue of pending IO requests for this disk in the order that they will 
be
+    /// processed. A ScanRange is added to this queue when it is returned in
+    /// GetNextRange(), or when it is added with schedule_immediately = true.
+    /// A WriteRange is added to this queue from unstarted_write_ranges_ for 
each
+    /// invocation of GetNextRequestRange() in WorkLoop().
+    /// The size of this queue is always less than or equal to 
num_remaining_ranges.
+    InternalQueue<RequestRange> in_flight_ranges_;
+
+    /// The next range to start for this reader on this disk. Each disk (for 
each reader)
+    /// picks the next range to start. The range is set here and also added to 
the
+    /// ready_to_start_ranges_ queue. The reader pulls from the queue in FIFO 
order,
+    /// so the ranges from different disks are round-robined. When the range 
is pulled
+    /// off the ready_to_start_ranges_ queue, it sets this variable to NULL, 
so the disk
+    /// knows to populate it again and add it to ready_to_start_ranges_ i.e. 
it is used
+    /// as a flag by DiskIoMgr::GetNextScanRange to determine if it needs to 
add another
+    /// range to ready_to_start_ranges_.
+    ScanRange* next_scan_range_to_start_ = nullptr;
+
+    /// For each disk, the number of threads issuing the underlying read/write 
on behalf
+    /// of this context. There are a few places where we release the context 
lock, do some
+    /// work, and then grab the lock again.  Because we don't hold the lock 
for the
+    /// entire operation, we need this ref count to keep track of which thread 
should do
+    /// final resource cleanup during cancellation.
+    /// Only the thread that sees the count at 0 should do the final cleanup.
+    AtomicInt32 num_threads_in_op_{0};
+
+    /// Queue of write ranges to process for this disk. A write range is 
always added
+    /// to in_flight_ranges_ in GetNextRequestRange(). There is a separate
+    /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate 
between reads
+    /// and writes. (Otherwise, since next_scan_range_to_start is set
+    /// in GetNextRequestRange() whenever it is null, repeated calls to
+    /// GetNextRequestRange() and GetNextRange() may result in only reads 
being processed)
+    InternalQueue<WriteRange> unstarted_write_ranges_;
+  };
+
+  /// Per disk states to synchronize multiple disk threads accessing the same 
request
+  /// context.
+  std::vector<PerDiskState> disk_states_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc 
b/be/src/runtime/disk-io-mgr-stress.cc
index 3959194..a98c3a4 100644
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ b/be/src/runtime/disk-io-mgr-stress.cc
@@ -19,6 +19,7 @@
 
 #include "runtime/disk-io-mgr-stress.h"
 
+#include "runtime/disk-io-mgr-reader-context.h"
 #include "util/time.h"
 
 #include "common/names.h"
@@ -57,7 +58,7 @@ string GenerateRandomData() {
 
 struct DiskIoMgrStress::Client {
   boost::mutex lock;
-  DiskIoRequestContext* reader;
+  unique_ptr<DiskIoRequestContext> reader;
   int file_idx;
   vector<DiskIoMgr::ScanRange*> scan_ranges;
   int abort_at_byte;
@@ -108,7 +109,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
     while (!eos) {
       DiskIoMgr::ScanRange* range;
-      Status status = io_mgr_->GetNextRange(client->reader, &range);
+      Status status = io_mgr_->GetNextRange(client->reader.get(), &range);
       CHECK(status.ok() || status.IsCancelled());
       if (range == NULL) break;
 
@@ -156,12 +157,12 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
     // Unregister the old client and get a new one
     unique_lock<mutex> lock(client->lock);
-    io_mgr_->UnregisterContext(client->reader);
+    io_mgr_->UnregisterContext(client->reader.get());
     NewClient(client_id);
   }
 
   unique_lock<mutex> lock(client->lock);
-  io_mgr_->UnregisterContext(client->reader);
+  io_mgr_->UnregisterContext(client->reader.get());
   client->reader = NULL;
 }
 
@@ -172,7 +173,7 @@ void DiskIoMgrStress::CancelRandomReader() {
   int rand_client = rand() % num_clients_;
 
   unique_lock<mutex> lock(clients_[rand_client].lock);
-  io_mgr_->CancelContext(clients_[rand_client].reader);
+  io_mgr_->CancelContext(clients_[rand_client].reader.get());
 }
 
 void DiskIoMgrStress::Run(int sec) {
@@ -197,7 +198,7 @@ void DiskIoMgrStress::Run(int sec) {
 
   for (int i = 0; i < num_clients_; ++i) {
     unique_lock<mutex> lock(clients_[i].lock);
-    if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader);
+    if (clients_[i].reader != NULL) 
io_mgr_->CancelContext(clients_[i].reader.get());
   }
 
   readers_.join_all();
@@ -239,7 +240,7 @@ void DiskIoMgrStress::NewClient(int i) {
   }
 
   client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
-  io_mgr_->RegisterContext(&client.reader, client_mem_trackers_[i].get());
-  Status status = io_mgr_->AddScanRanges(client.reader, client.scan_ranges);
+  client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
+  Status status = io_mgr_->AddScanRanges(client.reader.get(), 
client.scan_ranges);
   CHECK(status.ok());
 }

Reply via email to