http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
deleted file mode 100644
index 49de0ff..0000000
--- a/be/src/runtime/disk-io-mgr.h
+++ /dev/null
@@ -1,972 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_H
-
-#include <deque>
-#include <functional>
-#include <vector>
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/unordered_set.hpp>
-#include <boost/thread/mutex.hpp>
-
-#include "common/atomic.h"
-#include "common/hdfs.h"
-#include "common/object-pool.h"
-#include "common/status.h"
-#include "runtime/disk-io-mgr-handle-cache.h"
-#include "runtime/thread-resource-mgr.h"
-#include "util/aligned-new.h"
-#include "util/bit-util.h"
-#include "util/condition-variable.h"
-#include "util/error-util.h"
-#include "util/internal-queue.h"
-#include "util/runtime-profile.h"
-#include "util/thread.h"
-
-namespace impala {
-
-class MemTracker;
-
-/// Manager object that schedules IO for all queries on all disks and remote 
filesystems
-/// (such as S3). Each query maps to one or more DiskIoRequestContext objects, 
each of which
-/// has its own queue of scan ranges and/or write ranges.
-//
-/// The API splits up requesting scan/write ranges (non-blocking) and reading 
the data
-/// (blocking). The DiskIoMgr has worker threads that will read from and write 
to
-/// disk/hdfs/remote-filesystems, allowing interleaving of IO and CPU. This 
allows us to
-/// keep all disks and all cores as busy as possible.
-//
-/// All public APIs are thread-safe. It is not valid to call any of the APIs 
after
-/// UnregisterContext() returns.
-//
-/// For Readers:
-/// We can model this problem as a multiple producer (threads for each disk), 
multiple
-/// consumer (scan ranges) problem. There are multiple queues that need to be
-/// synchronized. Conceptually, there are two queues:
-///   1. The per disk queue: this contains a queue of readers that need reads.
-///   2. The per scan range ready-buffer queue: this contains buffers that 
have been
-///      read and are ready for the caller.
-/// The disk queue contains a queue of readers and is scheduled in a round 
robin fashion.
-/// Readers map to scan nodes. The reader then contains a queue of scan 
ranges. The caller
-/// asks the IoMgr for the next range to process. The IoMgr then selects the 
best range
-/// to read based on disk activity and begins reading and queuing buffers for 
that range.
-/// TODO: We should map readers to queries. A reader is the unit of scheduling 
and queries
-/// that have multiple scan nodes shouldn't have more 'turns'.
-//
-/// For Writers:
-/// Data is written via AddWriteRange(). This is non-blocking and adds a 
WriteRange to a
-/// per-disk queue. After the write is complete, a callback in WriteRange is 
invoked.
-/// No memory is allocated within IoMgr for writes and no copies are made. It 
is the
-/// responsibility of the client to ensure that the data to be written is 
valid and that
-/// the file to be written to exists until the callback is invoked.
-//
-/// The IoMgr provides three key APIs.
-///  1. AddScanRanges: this is non-blocking and tells the IoMgr all the ranges 
that
-///     will eventually need to be read.
-///  2. GetNextRange: returns to the caller the next scan range it should 
process.
-///     This is based on disk load. This also begins reading the data in this 
scan
-///     range. This is blocking.
-///  3. ScanRange::GetNext: returns the next buffer for this range.  This is 
blocking.
-//
-/// The disk threads do not synchronize with each other. The readers and 
writers don't
-/// synchronize with each other. There is a lock and condition variable for 
each request
-/// context queue and each disk queue.
-/// IMPORTANT: whenever both locks are needed, the lock order is to grab the 
context lock
-/// before the disk lock.
-//
-/// Scheduling: If there are multiple request contexts with work for a single 
disk, the
-/// request contexts are scheduled in round-robin order. Multiple disk threads 
can
-/// operate on the same request context. Exactly one request range is 
processed by a
-/// disk thread at a time. If there are multiple scan ranges scheduled via
-/// GetNextRange() for a single context, these are processed in round-robin 
order.
-/// If there are multiple scan and write ranges for a disk, a read is always 
followed
-/// by a write, and a write is followed by a read, i.e. reads and writes 
alternate.
-/// If multiple write ranges are enqueued for a single disk, they will be 
processed
-/// by the disk threads in order, but may complete in any order. No guarantees 
are made
-/// on ordering of writes across disks.
-//
-/// Resource Management: effective resource management in the IoMgr is key to 
good
-/// performance. The IoMgr helps coordinate two resources: CPU and disk. For 
CPU,
-/// spinning up too many threads causes thrashing.
-/// Memory usage in the IoMgr comes from queued read buffers.  If we queue the 
minimum
-/// (i.e. 1), then the disks are idle while we are processing the buffer. If 
we don't
-/// limit the queue, then it possible we end up queueing the entire data set 
(i.e. CPU
-/// is slower than disks) and run out of memory.
-/// For both CPU and memory, we want to model the machine as having a fixed 
amount of
-/// resources.  If a single query is running, it should saturate either CPU or 
Disk
-/// as well as using as little memory as possible. With multiple queries, each 
query
-/// should get less CPU. In that case each query will need fewer queued 
buffers and
-/// therefore have less memory usage.
-//
-/// The IoMgr defers CPU management to the caller. The IoMgr provides a 
GetNextRange
-/// API which will return the next scan range the caller should process. The 
caller
-/// can call this from the desired number of reading threads. Once a scan range
-/// has been returned via GetNextRange, the IoMgr will start to buffer reads 
for
-/// that range and it is expected the caller will pull those buffers promptly. 
For
-/// example, if the caller would like to have 1 scanner thread, the read loop
-/// would look like:
-///   while (more_ranges)
-///     range = GetNextRange()
-///     while (!range.eosr)
-///       buffer = range.GetNext()
-/// To have multiple reading threads, the caller would simply spin up the 
threads
-/// and each would process the loops above.
-//
-/// To control the number of IO buffers, each scan range has a limit of two 
queued
-/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at 
capacity,
-/// the IoMgr will no longer read for that scan range until the caller has 
processed
-/// a buffer. Assuming the client returns each buffer before requesting the 
next one
-/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O 
buffers per
-/// scan range.
-//
-/// Buffer Management:
-/// Buffers for reads are either a) allocated by the IoMgr and transferred to 
the caller,
-/// b) cached HDFS buffers if the scan range uses HDFS caching, or c) provided 
by the
-/// caller when constructing the scan range.
-///
-/// As a caller reads from a scan range, these buffers are wrapped in 
BufferDescriptors
-/// and returned to the caller. The caller must always call ReturnBuffer() on 
the buffer
-/// descriptor to allow recycling of the associated buffer (if there is an
-/// IoMgr-allocated or HDFS cached buffer).
-///
-/// Caching support:
-/// Scan ranges contain metadata on whether or not it is cached on the DN. In 
that
-/// case, we use the HDFS APIs to read the cached data without doing any 
copies. For these
-/// ranges, the reads happen on the caller thread (as opposed to the disk 
threads).
-/// It is possible for the cached read APIs to fail, in which case the ranges 
are then
-/// queued on the disk threads and behave identically to the case where the 
range
-/// is not cached.
-/// Resources for these ranges are also not accounted against the reader 
because none
-/// are consumed.
-/// While a cached block is being processed, the block is mlocked. We want to 
minimize
-/// the time the mlock is held.
-///   - HDFS will time us out if we hold onto the mlock for too long
-///   - Holding the lock prevents uncaching this file due to a caching policy 
change.
-/// Therefore, we only issue the cached read when the caller is ready to 
process the
-/// range (GetNextRange()) instead of when the ranges are issued. This 
guarantees that
-/// there will be a CPU available to process the buffer and any throttling we 
do with
-/// the number of scanner threads properly controls the amount of files we 
mlock.
-/// With cached scan ranges, we cannot close the scan range until the cached 
buffer
-/// is returned (HDFS does not allow this). We therefore need to defer the 
close until
-/// the cached buffer is returned (ReturnBuffer()).
-//
-/// Remote filesystem support (e.g. S3):
-/// Remote filesystems are modeled as "remote disks". That is, there is a 
seperate disk
-/// queue for each supported remote filesystem type. In order to maximize 
throughput,
-/// multiple connections are opened in parallel by having multiple threads 
running per
-/// queue. Also note that reading from a remote filesystem service can be more 
CPU
-/// intensive than local disk/hdfs because of non-direct I/O and SSL 
processing, and can
-/// be CPU bottlenecked especially if not enough I/O threads for these queues 
are
-/// started.
-//
-/// TODO: IoMgr should be able to request additional scan ranges from the 
coordinator
-/// to help deal with stragglers.
-/// TODO: look into using a lock free queue
-/// TODO: simplify the common path (less locking, memory allocations).
-/// TODO: Break this up the .h/.cc into multiple files under an /io 
subdirectory.
-//
-/// Structure of the Implementation:
-///  - All client APIs are defined in this file
-///  - Internal classes are defined in disk-io-mgr-internal.h
-///  - ScanRange APIs are implemented in disk-io-mgr-scan-range.cc
-///    This contains the ready buffer queue logic
-///  - DiskIoRequestContext APIs are implemented in 
disk-io-mgr-reader-context.cc
-///    This contains the logic for picking scan ranges for a reader.
-///  - Disk Thread and general APIs are implemented in disk-io-mgr.cc.
-
-class DiskIoRequestContext;
-
-// This is cache line aligned because the FileHandleCache needs cache line 
alignment
-// for its partitions.
-class DiskIoMgr : public CacheLineAligned {
- public:
-  class ScanRange;
-
-  /// Buffer struct that is used by the caller and IoMgr to pass read buffers.
-  /// It is is expected that only one thread has ownership of this object at a
-  /// time.
-  class BufferDescriptor {
-   public:
-    ~BufferDescriptor() {
-      DCHECK(buffer_ == nullptr); // Check we didn't leak a buffer.
-    }
-
-    ScanRange* scan_range() { return scan_range_; }
-    uint8_t* buffer() { return buffer_; }
-    int64_t buffer_len() { return buffer_len_; }
-    int64_t len() { return len_; }
-    bool eosr() { return eosr_; }
-
-    /// Returns the offset within the scan range that this buffer starts at
-    int64_t scan_range_offset() const { return scan_range_offset_; }
-
-    /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and 
set
-    /// 'mem_tracker_' to 'dst'. 'mem_tracker_' and 'dst' must be non-NULL. 
Does not
-    /// check memory limits on 'dst': the caller should check the memory limit 
if a
-    /// different memory limit may apply to 'dst'. If the buffer was a 
client-provided
-    /// buffer, transferring is not allowed.
-    /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp.
-    void TransferOwnership(MemTracker* dst);
-
-   private:
-    friend class DiskIoMgr;
-    friend class DiskIoMgr::ScanRange;
-    friend class DiskIoRequestContext;
-
-    /// Create a buffer descriptor for a new reader, range and data buffer. 
The buffer
-    /// memory should already be accounted against 'mem_tracker'.
-    BufferDescriptor(DiskIoMgr* io_mgr, DiskIoRequestContext* reader,
-        ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len,
-        MemTracker* mem_tracker);
-
-    /// Return true if this is a cached buffer owned by HDFS.
-    bool is_cached() const {
-      return scan_range_->external_buffer_tag_
-          == ScanRange::ExternalBufferTag::CACHED_BUFFER;
-    }
-
-    /// Return true if this is a buffer owner by the client that was provided 
when
-    /// constructing the scan range.
-    bool is_client_buffer() const {
-      return scan_range_->external_buffer_tag_
-          == ScanRange::ExternalBufferTag::CLIENT_BUFFER;
-    }
-
-    DiskIoMgr* const io_mgr_;
-
-    /// Reader that this buffer is for.
-    DiskIoRequestContext* const reader_;
-
-    /// The current tracker this buffer is associated with. After 
initialisation,
-    /// NULL for cached buffers and non-NULL for all other buffers.
-    MemTracker* mem_tracker_;
-
-    /// Scan range that this buffer is for. Non-NULL when initialised.
-    ScanRange* const scan_range_;
-
-    /// buffer with the read contents
-    uint8_t* buffer_;
-
-    /// length of buffer_. For buffers from cached reads, the length is 0.
-    const int64_t buffer_len_;
-
-    /// length of read contents
-    int64_t len_ = 0;
-
-    /// true if the current scan range is complete
-    bool eosr_ = false;
-
-    /// Status of the read to this buffer. if status is not ok, 'buffer' is 
nullptr
-    Status status_;
-
-    int64_t scan_range_offset_ = 0;
-  };
-
-  /// The request type, read or write associated with a request range.
-  struct RequestType {
-    enum type {
-      READ,
-      WRITE,
-    };
-  };
-
-  /// Represents a contiguous sequence of bytes in a single file.
-  /// This is the common base class for read and write IO requests - ScanRange 
and
-  /// WriteRange. Each disk thread processes exactly one RequestRange at a 
time.
-  class RequestRange : public InternalQueue<RequestRange>::Node {
-   public:
-    hdfsFS fs() const { return fs_; }
-    const char* file() const { return file_.c_str(); }
-    std::string* file_string() { return &file_; }
-    int64_t offset() const { return offset_; }
-    int64_t len() const { return len_; }
-    int disk_id() const { return disk_id_; }
-    RequestType::type request_type() const { return request_type_; }
-
-   protected:
-    RequestRange(RequestType::type request_type)
-      : fs_(nullptr), offset_(-1), len_(-1), disk_id_(-1), 
request_type_(request_type) {}
-
-    /// Hadoop filesystem that contains file_, or set to nullptr for local 
filesystem.
-    hdfsFS fs_;
-
-    /// Path to file being read or written.
-    std::string file_;
-
-    /// Offset within file_ being read or written.
-    int64_t offset_;
-
-    /// Length of data read or written.
-    int64_t len_;
-
-    /// Id of disk containing byte range.
-    int disk_id_;
-
-    /// The type of IO request, READ or WRITE.
-    RequestType::type request_type_;
-  };
-
-  /// Param struct for different combinations of buffering.
-  struct BufferOpts {
-   public:
-    // Set options for a read into an IoMgr-allocated or HDFS-cached buffer. 
Caching is
-    // enabled if 'try_cache' is true, the file is in the HDFS cache and 
'mtime' matches
-    // the modified time of the cached file in the HDFS cache.
-    BufferOpts(bool try_cache, int64_t mtime)
-      : try_cache_(try_cache),
-        mtime_(mtime),
-        client_buffer_(nullptr),
-        client_buffer_len_(-1) {}
-
-    /// Set options for an uncached read into an IoMgr-allocated buffer.
-    static BufferOpts Uncached() {
-      return BufferOpts(false, NEVER_CACHE, nullptr, -1);
-    }
-
-    /// Set options to read the entire scan range into 'client_buffer'. The 
length of the
-    /// buffer, 'client_buffer_len', must fit the entire scan range. HDFS 
caching is not
-    /// enabled in this case.
-    static BufferOpts ReadInto(uint8_t* client_buffer, int64_t 
client_buffer_len) {
-      return BufferOpts(false, NEVER_CACHE, client_buffer, client_buffer_len);
-    }
-
-   private:
-    friend class ScanRange;
-
-    BufferOpts(
-        bool try_cache, int64_t mtime, uint8_t* client_buffer, int64_t 
client_buffer_len)
-      : try_cache_(try_cache),
-        mtime_(mtime),
-        client_buffer_(client_buffer),
-        client_buffer_len_(client_buffer_len) {}
-
-    /// If 'mtime_' is set to NEVER_CACHE, the file handle will never be 
cached, because
-    /// the modification time won't match.
-    const static int64_t NEVER_CACHE = -1;
-
-    /// If true, read from HDFS cache if possible.
-    const bool try_cache_;
-
-    /// Last modified time of the file associated with the scan range. If set 
to
-    /// NEVER_CACHE, caching is disabled.
-    const int64_t mtime_;
-
-    /// A destination buffer provided by the client, nullptr and -1 if no 
buffer.
-    uint8_t* const client_buffer_;
-    const int64_t client_buffer_len_;
-  };
-
-  /// ScanRange description. The caller must call Reset() to initialize the 
fields
-  /// before calling AddScanRanges(). The private fields are used internally by
-  /// the IoMgr.
-  class ScanRange : public RequestRange {
-   public:
-    ScanRange();
-
-    virtual ~ScanRange();
-
-    /// Resets this scan range object with the scan range description. The 
scan range
-    /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is 
nullptr for the
-    /// local filesystem). The scan range must fall within the file bounds 
(offset >= 0
-    /// and offset + len <= file_length). 'disk_id' is the disk queue to add 
the range
-    /// to. If 'expected_local' is true, a warning is generated if the read 
did not
-    /// come from a local disk. 'buffer_opts' specifies buffer management 
options -
-    /// see the DiskIoMgr class comment and the BufferOpts comments for 
details.
-    /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary 
data.
-    void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int 
disk_id,
-        bool expected_local, const BufferOpts& buffer_opts, void* meta_data = 
nullptr);
-
-    void* meta_data() const { return meta_data_; }
-    bool try_cache() const { return try_cache_; }
-    bool expected_local() const { return expected_local_; }
-
-    /// Returns the next buffer for this scan range. buffer is an output 
parameter.
-    /// This function blocks until a buffer is ready or an error occurred. If 
this is
-    /// called when all buffers have been returned, *buffer is set to nullptr 
and Status::OK
-    /// is returned.
-    /// Only one thread can be in GetNext() at any time.
-    Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) 
WARN_UNUSED_RESULT;
-
-    /// Cancel this scan range. This cleans up all queued buffers and
-    /// wakes up any threads blocked on GetNext().
-    /// Status is the reason the range was cancelled. Must not be ok().
-    /// Status is returned to the user in GetNext().
-    void Cancel(const Status& status);
-
-    /// return a descriptive string for debug.
-    std::string DebugString() const;
-
-    int64_t mtime() const { return mtime_; }
-
-   private:
-    friend class DiskIoMgr;
-    friend class DiskIoRequestContext;
-
-    /// Initialize internal fields
-    void InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader);
-
-    /// Enqueues a buffer for this range. This does not block.
-    /// Returns true if this scan range has hit the queue capacity, false 
otherwise.
-    /// The caller passes ownership of buffer to the scan range and it is not
-    /// valid to access buffer after this call. The reader lock must be held 
by the
-    /// caller.
-    bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
-        std::unique_ptr<BufferDescriptor> buffer);
-
-    /// Cleanup any queued buffers (i.e. due to cancellation). This cannot
-    /// be called with any locks taken.
-    void CleanupQueuedBuffers();
-
-    /// Validates the internal state of this range. lock_ must be taken
-    /// before calling this.
-    bool Validate();
-
-    /// Maximum length in bytes for hdfsRead() calls.
-    int64_t MaxReadChunkSize() const;
-
-    /// Opens the file for this range. This function only modifies state in 
this range.
-    /// If 'use_file_handle_cache' is true and this is a local hdfs file, then 
this scan
-    /// range will not maintain an exclusive file handle. It will borrow an 
hdfs file
-    /// handle from the file handle cache for each Read(), so Open() does 
nothing.
-    /// If 'use_file_handle_cache' is false or this is a remote hdfs file or 
this is
-    /// a local OS file, Open() will maintain a file handle on the scan range 
for
-    /// exclusive use by this scan range. An exclusive hdfs file handle still 
comes
-    /// from the cache, but it is a newly opened file handle that is held for 
the
-    /// entire duration of a scan range's lifetime and destroyed in Close().
-    /// All local OS files are opened using normal OS file APIs.
-    Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT;
-
-    /// Closes the file for this range. This function only modifies state in 
this range.
-    void Close();
-
-    /// Reads from this range into 'buffer', which has length 'buffer_len' 
bytes. Returns
-    /// the number of bytes read. The read position in this scan range is 
updated.
-    Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read,
-        bool* eosr) WARN_UNUSED_RESULT;
-
-    /// Get the read statistics from the Hdfs file handle and aggregate them to
-    /// the DiskIoRequestContext. This clears the statistics on this file 
handle.
-    /// It is safe to pass hdfsFile by value, as hdfsFile's underlying type is 
a
-    /// pointer.
-    void GetHdfsStatistics(hdfsFile fh);
-
-    /// Reads from the DN cache. On success, sets cached_buffer_ to the DN 
buffer
-    /// and *read_succeeded to true.
-    /// If the data is not cached, returns ok() and *read_succeeded is set to 
false.
-    /// Returns a non-ok status if it ran into a non-continuable error.
-    ///  The reader lock must be held by the caller.
-    Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
-        bool* read_succeeded) WARN_UNUSED_RESULT;
-
-    /// Pointer to caller specified metadata. This is untouched by the io 
manager
-    /// and the caller can put whatever auxiliary data in here.
-    void* meta_data_ = nullptr;
-
-    /// If true, this scan range is expected to be cached. Note that this 
might be wrong
-    /// since the block could have been uncached. In that case, the cached path
-    /// will fail and we'll just put the scan range on the normal read path.
-    bool try_cache_ = false;
-
-    /// If true, we expect this scan range to be a local read. Note that if 
this is false,
-    /// it does not necessarily mean we expect the read to be remote, and that 
we never
-    /// create scan ranges where some of the range is expected to be remote 
and some of it
-    /// local.
-    /// TODO: we can do more with this
-    bool expected_local_ = false;
-
-    /// Total number of bytes read remotely. This is necessary to maintain a 
count of
-    /// the number of remote scan ranges. Since IO statistics can be collected 
multiple
-    /// times for a scan range, it is necessary to keep some state about 
whether this
-    /// scan range has already been counted as remote. There is also a 
requirement to
-    /// log the number of unexpected remote bytes for a scan range. To solve 
both
-    /// requirements, maintain num_remote_bytes_ on the ScanRange and push it 
to the
-    /// reader_ once at the close of the scan range.
-    int64_t num_remote_bytes_;
-
-    DiskIoMgr* io_mgr_ = nullptr;
-
-    /// Reader/owner of the scan range
-    DiskIoRequestContext* reader_ = nullptr;
-
-    /// File handle either to hdfs or local fs (FILE*)
-    /// The hdfs file handle is only stored here in three cases:
-    /// 1. The file handle cache is off (max_cached_file_handles == 0).
-    /// 2. The scan range is using hdfs caching.
-    /// -OR-
-    /// 3. The hdfs file is expected to be remote (expected_local_ == false)
-    /// In each case, the scan range gets a new file handle from the file 
handle cache
-    /// at Open(), holds it exclusively, and destroys it in Close().
-    union {
-      FILE* local_file_ = nullptr;
-      HdfsFileHandle* exclusive_hdfs_fh_;
-    };
-
-    /// Tagged union that holds a buffer for the cases when there is a buffer 
allocated
-    /// externally from DiskIoMgr that is associated with the scan range.
-    enum class ExternalBufferTag { CLIENT_BUFFER, CACHED_BUFFER, NO_BUFFER };
-    ExternalBufferTag external_buffer_tag_;
-    union {
-      /// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER.
-      struct {
-        /// Client-provided buffer to read the whole scan range into.
-        uint8_t* data;
-
-        /// Length of the client-provided buffer.
-        int64_t len;
-      } client_buffer_;
-
-      /// Valid and non-NULL if the external_buffer_tag_ is CACHED_BUFFER, 
which means
-      /// that a cached read succeeded and all the bytes for the range are in 
this buffer.
-      struct hadoopRzBuffer* cached_buffer_ = nullptr;
-    };
-
-    /// Lock protecting fields below.
-    /// This lock should not be taken during Open()/Read()/Close().
-    /// If DiskIoRequestContext::lock_ and this lock need to be held 
simultaneously,
-    /// DiskIoRequestContext::lock_ must be taken first.
-    boost::mutex lock_;
-
-    /// Number of bytes read so far for this scan range
-    int bytes_read_;
-
-    /// Status for this range. This is non-ok if is_cancelled_ is true.
-    /// Note: an individual range can fail without the DiskIoRequestContext 
being
-    /// cancelled. This allows us to skip individual ranges.
-    Status status_;
-
-    /// If true, the last buffer for this scan range has been queued.
-    bool eosr_queued_ = false;
-
-    /// If true, the last buffer for this scan range has been returned.
-    bool eosr_returned_ = false;
-
-    /// If true, this scan range has been removed from the reader's 
in_flight_ranges
-    /// queue because the ready_buffers_ queue is full.
-    bool blocked_on_queue_ = false;
-
-    /// IO buffers that are queued for this scan range.
-    /// Condition variable for GetNext
-    ConditionVariable buffer_ready_cv_;
-    std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
-
-    /// Lock that should be taken during hdfs calls. Only one thread (the disk 
reading
-    /// thread) calls into hdfs at a time so this lock does not have 
performance impact.
-    /// This lock only serves to coordinate cleanup. Specifically it serves to 
ensure
-    /// that the disk threads are finished with HDFS calls before 
is_cancelled_ is set
-    /// to true and cleanup starts.
-    /// If this lock and lock_ need to be taken, lock_ must be taken first.
-    boost::mutex hdfs_lock_;
-
-    /// If true, this scan range has been cancelled.
-    bool is_cancelled_ = false;
-
-    /// Last modified time of the file associated with the scan range
-    int64_t mtime_;
-  };
-
-  /// Used to specify data to be written to a file and offset.
-  /// It is the responsibility of the client to ensure that the data to be 
written is
-  /// valid and that the file to be written to exists until the callback is 
invoked.
-  /// A callback is invoked to inform the client when the write is done.
-  class WriteRange : public RequestRange {
-   public:
-    /// This callback is invoked on each WriteRange after the write is 
complete or the
-    /// context is cancelled. The status returned by the callback parameter 
indicates
-    /// if the write was successful (i.e. Status::OK), if there was an error
-    /// TStatusCode::RUNTIME_ERROR) or if the context was cancelled
-    /// (TStatusCode::CANCELLED). The callback is only invoked if this 
WriteRange was
-    /// successfully added (i.e. AddWriteRange() succeeded). No locks are held 
while
-    /// the callback is invoked.
-    typedef std::function<void(const Status&)> WriteDoneCallback;
-    WriteRange(const std::string& file, int64_t file_offset, int disk_id,
-        WriteDoneCallback callback);
-
-    /// Change the file and offset of this write range. Data and callbacks are 
unchanged.
-    /// Can only be called when the write is not in flight (i.e. before 
AddWriteRange()
-    /// is called or after the write callback was called).
-    void SetRange(const std::string& file, int64_t file_offset, int disk_id);
-
-    /// Set the data and number of bytes to be written for this WriteRange.
-    /// Can only be called when the write is not in flight (i.e. before 
AddWriteRange()
-    /// is called or after the write callback was called).
-    void SetData(const uint8_t* buffer, int64_t len);
-
-    const uint8_t* data() const { return data_; }
-
-   private:
-    friend class DiskIoMgr;
-    friend class DiskIoRequestContext;
-
-    /// Data to be written. RequestRange::len_ contains the length of data
-    /// to be written.
-    const uint8_t* data_;
-
-    /// Callback to invoke after the write is complete.
-    WriteDoneCallback callback_;
-  };
-
-  /// Create a DiskIoMgr object. This constructor is only used for testing.
-  ///  - num_disks: The number of disks the IoMgr should use. This is used for 
testing.
-  ///    Specify 0, to have the disk IoMgr query the os for the number of 
disks.
-  ///  - threads_per_rotational_disk: number of read threads to create per 
rotational
-  ///    disk. This is also the max queue depth.
-  ///  - threads_per_solid_state_disk: number of read threads to create per 
solid state
-  ///    disk. This is also the max queue depth.
-  ///  - min_buffer_size: minimum io buffer size (in bytes)
-  ///  - max_buffer_size: maximum io buffer size (in bytes). Also the max read 
size.
-  DiskIoMgr(int num_disks, int threads_per_rotational_disk,
-      int threads_per_solid_state_disk, int min_buffer_size, int 
max_buffer_size);
-
-  /// Create DiskIoMgr with default configs.
-  DiskIoMgr();
-
-  /// Clean up all threads and resources. This is mostly useful for testing 
since
-  /// for impalad, this object is never destroyed.
-  ~DiskIoMgr();
-
-  /// Initialize the IoMgr. Must be called once before any of the other APIs.
-  Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT;
-
-  /// Allocates tracking structure for a request context.
-  /// Register a new request context and return it to the caller. The caller 
must call
-  /// UnregisterContext() for each context.
-  /// reader_mem_tracker: Is non-null only for readers. IO buffers
-  ///    used for this reader will be tracked by this. If the limit is exceeded
-  ///    the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned 
via
-  ///    GetNext().
-  std::unique_ptr<DiskIoRequestContext> RegisterContext(MemTracker* 
reader_mem_tracker);
-
-  /// Unregisters context from the disk IoMgr by first cancelling it then 
blocking until
-  /// all references to the context are removed from I/O manager internal data 
structures.
-  /// This must be called for every RegisterContext() to ensure that the 
context object
-  /// can be safely destroyed. It is invalid to add more request ranges to 
'context' after
-  /// after this call. This call blocks until all the disk threads have 
finished cleaning
-  /// up.
-  void UnregisterContext(DiskIoRequestContext* context);
-
-  /// This function cancels the context asychronously. All outstanding requests
-  /// are aborted and tracking structures cleaned up. This does not need to be
-  /// called if the context finishes normally.
-  /// This will also fail any outstanding GetNext()/Read requests.
-  void CancelContext(DiskIoRequestContext* context);
-
-  /// Adds the scan ranges to the queues. This call is non-blocking. The 
caller must
-  /// not deallocate the scan range pointers before UnregisterContext().
-  /// If schedule_immediately, the ranges are immediately put on the read queue
-  /// (i.e. the caller should not/cannot call GetNextRange for these ranges).
-  /// This can be used to do synchronous reads as well as schedule dependent 
ranges,
-  /// as in the case for columnar formats.
-  Status AddScanRanges(DiskIoRequestContext* reader,
-      const std::vector<ScanRange*>& ranges,
-      bool schedule_immediately = false) WARN_UNUSED_RESULT;
-  Status AddScanRange(DiskIoRequestContext* reader, ScanRange* range,
-      bool schedule_immediately = false) WARN_UNUSED_RESULT;
-
-  /// Add a WriteRange for the writer. This is non-blocking and schedules the 
context
-  /// on the IoMgr disk queue. Does not create any files.
-  Status AddWriteRange(
-      DiskIoRequestContext* writer, WriteRange* write_range) 
WARN_UNUSED_RESULT;
-
-  /// Returns the next unstarted scan range for this reader. When the range is 
returned,
-  /// the disk threads in the IoMgr will already have started reading from it. 
The
-  /// caller is expected to call ScanRange::GetNext on the returned range.
-  /// If there are no more unstarted ranges, nullptr is returned.
-  /// This call is blocking.
-  Status GetNextRange(DiskIoRequestContext* reader, ScanRange** range) 
WARN_UNUSED_RESULT;
-
-  /// Reads the range and returns the result in buffer.
-  /// This behaves like the typical synchronous read() api, blocking until the 
data
-  /// is read. This can be called while there are outstanding ScanRanges and is
-  /// thread safe. Multiple threads can be calling Read() per reader at a time.
-  /// range *cannot* have already been added via AddScanRanges.
-  /// This can only be used if the scan range fits in a single IO buffer (i.e. 
is smaller
-  /// than max_read_buffer_size()) or if reading into a client-provided buffer.
-  Status Read(DiskIoRequestContext* reader, ScanRange* range,
-      std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
-
-  /// Returns the buffer to the IoMgr. This must be called for every buffer
-  /// returned by GetNext()/Read() that did not return an error. This is 
non-blocking.
-  /// After calling this, the buffer descriptor is invalid and cannot be 
accessed.
-  void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
-
-  /// Determine which disk queue this file should be assigned to.  Returns an 
index into
-  /// disk_queues_.  The disk_id is the volume ID for the local disk that 
holds the
-  /// files, or -1 if unknown.  Flag expected_local is true iff this impalad is
-  /// co-located with the datanode for this file.
-  int AssignQueue(const char* file, int disk_id, bool expected_local);
-
-  /// TODO: The functions below can be moved to DiskIoRequestContext.
-  /// Returns the current status of the context.
-  Status context_status(DiskIoRequestContext* context) const 
WARN_UNUSED_RESULT;
-
-  void set_bytes_read_counter(DiskIoRequestContext*, RuntimeProfile::Counter*);
-  void set_read_timer(DiskIoRequestContext*, RuntimeProfile::Counter*);
-  void set_active_read_thread_counter(DiskIoRequestContext*, 
RuntimeProfile::Counter*);
-  void set_disks_access_bitmap(DiskIoRequestContext*, 
RuntimeProfile::Counter*);
-
-  int64_t queue_size(DiskIoRequestContext* reader) const;
-  int64_t bytes_read_local(DiskIoRequestContext* reader) const;
-  int64_t bytes_read_short_circuit(DiskIoRequestContext* reader) const;
-  int64_t bytes_read_dn_cache(DiskIoRequestContext* reader) const;
-  int num_remote_ranges(DiskIoRequestContext* reader) const;
-  int64_t unexpected_remote_bytes(DiskIoRequestContext* reader) const;
-  int cached_file_handles_hit_count(DiskIoRequestContext* reader) const;
-  int cached_file_handles_miss_count(DiskIoRequestContext* reader) const;
-
-  /// Returns the read throughput across all readers.
-  /// TODO: should this be a sliding window?  This should report metrics for 
the
-  /// last minute, hour and since the beginning.
-  int64_t GetReadThroughput();
-
-  /// Returns the maximum read buffer size
-  int max_read_buffer_size() const { return max_buffer_size_; }
-
-  /// Returns the total number of disk queues (both local and remote).
-  int num_total_disks() const { return disk_queues_.size(); }
-
-  /// Returns the total number of remote "disk" queues.
-  int num_remote_disks() const { return REMOTE_NUM_DISKS; }
-
-  /// Returns the number of local disks attached to the system.
-  int num_local_disks() const { return num_total_disks() - num_remote_disks(); 
}
-
-  /// The disk ID (and therefore disk_queues_ index) used for DFS accesses.
-  int RemoteDfsDiskId() const { return num_local_disks() + 
REMOTE_DFS_DISK_OFFSET; }
-
-  /// The disk ID (and therefore disk_queues_ index) used for S3 accesses.
-  int RemoteS3DiskId() const { return num_local_disks() + 
REMOTE_S3_DISK_OFFSET; }
-
-  /// The disk ID (and therefore disk_queues_ index) used for ADLS accesses.
-  int RemoteAdlsDiskId() const { return num_local_disks() + 
REMOTE_ADLS_DISK_OFFSET; }
-
-  /// Dumps the disk IoMgr queues (for readers and disks)
-  std::string DebugString();
-
-  /// Validates the internal state is consistent. This is intended to only be 
used
-  /// for debugging.
-  bool Validate() const;
-
-  /// Given a FS handle, name and last modified time of the file, gets an 
HdfsFileHandle
-  /// from the file handle cache. If 'require_new_handle' is true, the cache 
will open
-  /// a fresh file handle. On success, records statistics about whether this 
was
-  /// a cache hit or miss in the 'reader' as well as at the system level. In 
case of an
-  /// error returns nullptr.
-  HdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs,
-      std::string* fname, int64_t mtime, DiskIoRequestContext *reader,
-      bool require_new_handle);
-
-  /// Releases a file handle back to the file handle cache when it is no 
longer in use.
-  /// If 'destroy_handle' is true, the file handle cache will close the file 
handle
-  /// immediately.
-  void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid,
-      bool destroy_handle);
-
-  /// Reopens a file handle by destroying the file handle and getting a fresh
-  /// file handle from the cache. Returns an error if the file could not be 
reopened.
-  Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, 
int64_t mtime,
-      HdfsFileHandle** fid);
-
-  /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the 
buffers if
-  /// 'bytes_to_free' is -1.
-  void GcIoBuffers(int64_t bytes_to_free = -1);
-
-  /// The maximum number of ready buffers that can be queued in a scan range. 
Having two
-  /// queued buffers (plus the buffer that is returned to the client) gives 
good
-  /// performance in most scenarios:
-  /// 1. If the consumer is consuming data faster than we can read from disk, 
then the
-  ///    queue will be empty most of the time because the buffer will be 
immediately
-  ///    pulled off the queue as soon as it is added. There will always be an 
I/O request
-  ///    in the disk queue to maximize I/O throughput, which is the bottleneck 
in this
-  ///    case.
-  /// 2. If we can read from disk faster than the consumer is consuming data, 
the queue
-  ///    will fill up and there will always be a buffer available for the 
consumer to
-  ///    read, so the consumer will not block and we maximize consumer 
throughput, which
-  ///    is the bottleneck in this case.
-  /// 3. If the consumer is consuming data at approximately the same rate as 
we are
-  ///    reading from disk, then the steady state is that the consumer is 
processing one
-  ///    buffer and one buffer is in the disk queue. The additional buffer can 
absorb
-  ///    bursts where the producer runs faster than the consumer or the 
consumer runs
-  ///    faster than the producer without blocking either the producer or 
consumer.
-  static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2;
-
-  /// "Disk" queue offsets for remote accesses.  Offset 0 corresponds to
-  /// disk ID (i.e. disk_queue_ index) of num_local_disks().
-  enum {
-    REMOTE_DFS_DISK_OFFSET = 0,
-    REMOTE_S3_DISK_OFFSET,
-    REMOTE_ADLS_DISK_OFFSET,
-    REMOTE_NUM_DISKS
-  };
-
- private:
-  friend class BufferDescriptor;
-  friend class DiskIoRequestContext;
-  struct DiskQueue;
-
-  friend class DiskIoMgrTest_Buffers_Test;
-  friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
-
-  /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
-  boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
-
-  /// Memory tracker for I/O buffers where the DiskIoRequestContext has no 
MemTracker.
-  /// TODO: once IMPALA-3200 is fixed, there should be no more cases where 
readers don't
-  /// provide a MemTracker.
-  boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
-
-  /// Number of worker(read) threads per rotational disk. Also the max depth 
of queued
-  /// work to the disk.
-  const int num_io_threads_per_rotational_disk_;
-
-  /// Number of worker(read) threads per solid state disk. Also the max depth 
of queued
-  /// work to the disk.
-  const int num_io_threads_per_solid_state_disk_;
-
-  /// Maximum read size. This is also the maximum size of each allocated 
buffer.
-  const int max_buffer_size_;
-
-  /// The minimum size of each read buffer.
-  const int min_buffer_size_;
-
-  /// Thread group containing all the worker threads.
-  ThreadGroup disk_thread_group_;
-
-  /// Options object for cached hdfs reads. Set on startup and never modified.
-  struct hadoopRzOptions* cached_read_options_ = nullptr;
-
-  /// True if the IoMgr should be torn down. Worker threads watch for this to
-  /// know to terminate. This variable is read/written to by different threads.
-  volatile bool shut_down_;
-
-  /// Total bytes read by the IoMgr.
-  RuntimeProfile::Counter total_bytes_read_counter_;
-
-  /// Total time spent in hdfs reading
-  RuntimeProfile::Counter read_timer_;
-
-  /// Protects free_buffers_
-  boost::mutex free_buffers_lock_;
-
-  /// Free buffers that can be handed out to clients. There is one list for 
each buffer
-  /// size, indexed by the Log2 of the buffer size in units of 
min_buffer_size_. The
-  /// maximum buffer size is max_buffer_size_, so the maximum index is
-  /// Log2(max_buffer_size_ / min_buffer_size_).
-  //
-  /// E.g. if min_buffer_size_ = 1024 bytes:
-  ///  free_buffers_[0]  => list of free buffers with size 1024 B
-  ///  free_buffers_[1]  => list of free buffers with size 2048 B
-  ///  free_buffers_[10] => list of free buffers with size 1 MB
-  ///  free_buffers_[13] => list of free buffers with size 8 MB
-  ///  free_buffers_[n]  => list of free buffers with size 2^n * 1024 B
-  std::vector<std::deque<uint8_t*>> free_buffers_;
-
-  /// Total number of allocated buffers, used for debugging.
-  AtomicInt32 num_allocated_buffers_;
-
-  /// Total number of buffers in readers
-  AtomicInt32 num_buffers_in_readers_;
-
-  /// Per disk queues. This is static and created once at Init() time.  One 
queue is
-  /// allocated for each local disk on the system and for each remote 
filesystem type.
-  /// It is indexed by disk id.
-  std::vector<DiskQueue*> disk_queues_;
-
-  /// The next disk queue to write to if the actual 'disk_id_' is unknown 
(i.e. the file
-  /// is not associated with a particular local disk or remote queue). Used to 
implement
-  /// round-robin assignment for that case.
-  static AtomicInt32 next_disk_id_;
-
-  // Number of file handle cache partitions to use
-  static const size_t NUM_FILE_HANDLE_CACHE_PARTITIONS = 16;
-
-  // Caching structure that maps file names to cached file handles. The cache 
has an upper
-  // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached 
file
-  // handles are closed.
-  FileHandleCache<NUM_FILE_HANDLE_CACHE_PARTITIONS> file_handle_cache_;
-
-  /// Returns the index into free_buffers_ for a given buffer size
-  int free_buffers_idx(int64_t buffer_size);
-
-  /// Returns a buffer to read into with size between 'buffer_size' and
-  /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the
-  /// 'free_buffers_', that is returned, otherwise a new one is allocated.
-  /// The returned *buffer_size must be between 0 and 'max_buffer_size_'.
-  /// The buffer memory is tracked against reader's mem tracker, or
-  /// 'unowned_buffer_mem_tracker_' if the reader does not have one.
-  std::unique_ptr<BufferDescriptor> GetFreeBuffer(
-      DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size);
-
-  /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be 
nullptr), either
-  /// freeing it or returning it to 'free_buffers_'. Memory tracking is 
updated to
-  /// reflect the transfer of ownership from desc->mem_tracker_ to the disk 
I/O mgr.
-  void FreeBufferMemory(BufferDescriptor* desc);
-
-  /// Disk worker thread loop. This function retrieves the next range to 
process on
-  /// the disk queue and invokes ReadRange() or Write() depending on the type 
of Range().
-  /// There can be multiple threads per disk running this loop.
-  void WorkLoop(DiskQueue* queue);
-
-  /// This is called from the disk thread to get the next range to process. It 
will
-  /// wait until a scan range and buffer are available, or a write range is 
available.
-  /// This functions returns the range to process.
-  /// Only returns false if the disk thread should be shut down.
-  /// No locks should be taken before this function call and none are left 
taken after.
-  bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
-      DiskIoRequestContext** request_context);
-
-  /// Updates disk queue and reader state after a read is complete. The read 
result
-  /// is captured in the buffer descriptor.
-  void HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* reader,
-      std::unique_ptr<BufferDescriptor> buffer);
-
-  /// Invokes write_range->callback_  after the range has been written and
-  /// updates per-disk state and handle state. The status of the write 
OK/RUNTIME_ERROR
-  /// etc. is passed via write_status and to the callback.
-  /// The write_status does not affect the writer->status_. That is, an write 
error does
-  /// not cancel the writer context - that decision is left to the callback 
handler.
-  /// TODO: On the read path, consider not canceling the reader context on 
error.
-  void HandleWriteFinished(
-      DiskIoRequestContext* writer, WriteRange* write_range, const Status& 
write_status);
-
-  /// Validates that range is correctly initialized
-  Status ValidateScanRange(ScanRange* range) WARN_UNUSED_RESULT;
-
-  /// Write the specified range to disk and calls HandleWriteFinished when 
done.
-  /// Responsible for opening and closing the file that is written.
-  void Write(DiskIoRequestContext* writer_context, WriteRange* write_range);
-
-  /// Helper method to write a range using the specified FILE handle. Returns 
Status:OK
-  /// if the write succeeded, or a RUNTIME_ERROR with an appropriate message 
otherwise.
-  /// Does not open or close the file that is written.
-  Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) 
WARN_UNUSED_RESULT;
-
-  /// Reads the specified scan range and calls HandleReadFinished when done.
-  void ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader, 
ScanRange* range);
-
-  /// Try to allocate the next buffer for the scan range, returning the new 
buffer
-  /// if successful. If 'reader' is cancelled, cancels the range and returns 
nullptr.
-  /// If there is memory pressure and buffers are already queued, adds the 
range
-  /// to the blocked ranges and returns nullptr.
-  std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* 
disk_queue,
-      DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size);
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 999b56a..6fb572c 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -36,7 +36,7 @@
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
-#include "runtime/disk-io-mgr.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/hbase-table-factory.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/krpc-data-stream-mgr.h"
@@ -164,7 +164,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, 
int krpc_port,
             FLAGS_catalog_client_rpc_timeout_ms, 
FLAGS_catalog_client_rpc_timeout_ms, "",
             !FLAGS_ssl_client_ca_certificate.empty())),
     htable_factory_(new HBaseTableFactory()),
-    disk_io_mgr_(new DiskIoMgr()),
+    disk_io_mgr_(new io::DiskIoMgr()),
     webserver_(new Webserver(webserver_port)),
     pool_mem_trackers_(new PoolMemTrackerRegistry),
     thread_mgr_(new ThreadResourceMgr),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 8fafdc5..193fdde 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -43,7 +43,6 @@ class BufferPool;
 class CallableThreadPool;
 class DataStreamMgrBase;
 class DataStreamMgr;
-class DiskIoMgr;
 class QueryExecMgr;
 class Frontend;
 class HBaseTableFactory;
@@ -65,6 +64,10 @@ class ThreadResourceMgr;
 class TmpFileMgr;
 class Webserver;
 
+namespace io {
+  class DiskIoMgr;
+}
+
 /// Execution environment for Impala daemon. Contains all required global 
structures, and
 /// handles to singleton services. Clients must call StartServices() exactly 
once to
 /// properly initialise service state.
@@ -116,7 +119,7 @@ class ExecEnv {
     return catalogd_client_cache_.get();
   }
   HBaseTableFactory* htable_factory() { return htable_factory_.get(); }
-  DiskIoMgr* disk_io_mgr() { return disk_io_mgr_.get(); }
+  io::DiskIoMgr* disk_io_mgr() { return disk_io_mgr_.get(); }
   Webserver* webserver() { return webserver_.get(); }
   MetricGroup* metrics() { return metrics_.get(); }
   MemTracker* process_mem_tracker() { return mem_tracker_.get(); }
@@ -174,7 +177,7 @@ class ExecEnv {
   boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_;
   boost::scoped_ptr<CatalogServiceClientCache> catalogd_client_cache_;
   boost::scoped_ptr<HBaseTableFactory> htable_factory_;
-  boost::scoped_ptr<DiskIoMgr> disk_io_mgr_;
+  boost::scoped_ptr<io::DiskIoMgr> disk_io_mgr_;
   boost::scoped_ptr<Webserver> webserver_;
   boost::scoped_ptr<MemTracker> mem_tracker_;
   boost::scoped_ptr<PoolMemTrackerRegistry> pool_mem_trackers_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt
new file mode 100644
index 0000000..ae89509
--- /dev/null
+++ b/be/src/runtime/io/CMakeLists.txt
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/io")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/io")
+
+add_library(Io
+  disk-io-mgr.cc
+  disk-io-mgr-stress.cc
+  request-context.cc
+  scan-range.cc
+)
+add_dependencies(Io gen-deps)
+
+# This test runs forever so should not be part of 'make test'
+add_executable(disk-io-mgr-stress-test disk-io-mgr-stress-test.cc)
+target_link_libraries(disk-io-mgr-stress-test ${IMPALA_TEST_LINK_LIBS})
+
+ADD_BE_TEST(disk-io-mgr-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h 
b/be/src/runtime/io/disk-io-mgr-internal.h
new file mode 100644
index 0000000..3fc3895
--- /dev/null
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
+#define IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
+
+#include <unistd.h>
+#include <queue>
+#include <boost/thread/locks.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "common/logging.h"
+#include "runtime/io/request-context.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/thread-resource-mgr.h"
+#include "util/condition-variable.h"
+#include "util/cpu-info.h"
+#include "util/debug-util.h"
+#include "util/disk-info.h"
+#include "util/filesystem-util.h"
+#include "util/hdfs-util.h"
+#include "util/impalad-metrics.h"
+
+/// This file contains internal structures shared between submodules of the 
IoMgr. Users
+/// of the IoMgr do not need to include this file.
+namespace impala {
+namespace io {
+
+/// Per disk state
+struct DiskIoMgr::DiskQueue {
+  /// Disk id (0-based)
+  int disk_id;
+
+  /// Lock that protects access to 'request_contexts' and 'work_available'
+  boost::mutex lock;
+
+  /// Condition variable to signal the disk threads that there is work to do 
or the
+  /// thread should shut down.  A disk thread will be woken up when there is a 
reader
+  /// added to the queue. A reader is only on the queue when it has at least 
one
+  /// scan range that is not blocked on available buffers.
+  ConditionVariable work_available;
+
+  /// list of all request contexts that have work queued on this disk
+  std::list<RequestContext*> request_contexts;
+
+  /// Enqueue the request context to the disk queue.  The DiskQueue lock must 
not be taken.
+  inline void EnqueueContext(RequestContext* worker) {
+    {
+      boost::unique_lock<boost::mutex> disk_lock(lock);
+      /// Check that the reader is not already on the queue
+      DCHECK(find(request_contexts.begin(), request_contexts.end(), worker) ==
+          request_contexts.end());
+      request_contexts.push_back(worker);
+    }
+    work_available.NotifyAll();
+  }
+
+  DiskQueue(int id) : disk_id(id) {}
+};
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc 
b/be/src/runtime/io/disk-io-mgr-stress-test.cc
new file mode 100644
index 0000000..45b36ed
--- /dev/null
+++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/io/disk-io-mgr-stress.h"
+#include "util/cpu-info.h"
+#include "util/string-parser.h"
+
+#include "common/names.h"
+
+using namespace impala;
+using namespace impala::io;
+
+// Simple utility to run the disk io stress test.  A optional second parameter
+// can be passed to control how long to run this test (0 for forever).
+
+// TODO: make these configurable once we decide how to run BE tests with args
+const int DEFAULT_DURATION_SEC = 1;
+const int NUM_DISKS = 5;
+const int NUM_THREADS_PER_DISK = 5;
+const int NUM_CLIENTS = 10;
+const bool TEST_CANCELLATION = true;
+
+int main(int argc, char** argv) {
+  google::InitGoogleLogging(argv[0]);
+  CpuInfo::Init();
+  OsInfo::Init();
+  impala::InitThreading();
+  int duration_sec = DEFAULT_DURATION_SEC;
+
+  if (argc == 2) {
+    StringParser::ParseResult status;
+    duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), 
&status);
+    if (status != StringParser::PARSE_SUCCESS) {
+      printf("Invalid arg: %s\n", argv[1]);
+      return 1;
+    }
+  }
+  if (duration_sec != 0) {
+    printf("Running stress test for %d seconds.\n", duration_sec);
+  } else {
+    printf("Running stress test indefinitely.\n");
+  }
+  DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, 
TEST_CANCELLATION);
+  test.Run(duration_sec);
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc 
b/be/src/runtime/io/disk-io-mgr-stress.cc
new file mode 100644
index 0000000..8815357
--- /dev/null
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/thread/mutex.hpp>
+
+#include "runtime/io/disk-io-mgr-stress.h"
+
+#include "runtime/io/request-context.h"
+#include "util/time.h"
+
+#include "common/names.h"
+
+using namespace impala;
+using namespace impala::io;
+
+static const float ABORT_CHANCE = .10f;
+static const int MIN_READ_LEN = 1;
+static const int MAX_READ_LEN = 20;
+
+static const int MIN_FILE_LEN = 10;
+static const int MAX_FILE_LEN = 1024;
+
+// Make sure this is between MIN/MAX FILE_LEN to test more cases
+static const int MIN_READ_BUFFER_SIZE = 64;
+static const int MAX_READ_BUFFER_SIZE = 128;
+
+static const int CANCEL_READER_PERIOD_MS = 20;  // in ms
+
+static void CreateTempFile(const char* filename, const char* data) {
+  FILE* file = fopen(filename, "w");
+  CHECK(file != NULL);
+  fwrite(data, 1, strlen(data), file);
+  fclose(file);
+}
+
+string GenerateRandomData() {
+  int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN;
+  stringstream ss;
+  for (int i = 0; i < rand_len; ++i) {
+    char c = rand() % 26 + 'a';
+    ss << c;
+  }
+  return ss.str();
+}
+
+struct DiskIoMgrStress::Client {
+  boost::mutex lock;
+  unique_ptr<RequestContext> reader;
+  int file_idx;
+  vector<ScanRange*> scan_ranges;
+  int abort_at_byte;
+  int files_processed;
+};
+
+DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
+     int num_clients, bool includes_cancellation) :
+    num_clients_(num_clients),
+    includes_cancellation_(includes_cancellation) {
+
+  time_t rand_seed = time(NULL);
+  LOG(INFO) << "Running with rand seed: " << rand_seed;
+  srand(rand_seed);
+
+  io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, 
num_threads_per_disk,
+      MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
+  Status status = io_mgr_->Init(&mem_tracker_);
+  CHECK(status.ok());
+
+  // Initialize some data files.  It doesn't really matter how many there are.
+  files_.resize(num_clients * 2);
+  for (int i = 0; i < files_.size(); ++i) {
+    stringstream ss;
+    ss << "/tmp/disk_io_mgr_stress_file" << i;
+    files_[i].filename = ss.str();
+    files_[i].data = GenerateRandomData();
+    CreateTempFile(files_[i].filename.c_str(), files_[i].data.c_str());
+  }
+
+  clients_ = new Client[num_clients_];
+  client_mem_trackers_.resize(num_clients_);
+  for (int i = 0; i < num_clients_; ++i) {
+    NewClient(i);
+  }
+}
+
+void DiskIoMgrStress::ClientThread(int client_id) {
+  Client* client = &clients_[client_id];
+  Status status;
+  char read_buffer[MAX_FILE_LEN];
+
+  while (!shutdown_) {
+    bool eos = false;
+    int bytes_read = 0;
+
+    const string& expected = files_[client->file_idx].data;
+
+    while (!eos) {
+      ScanRange* range;
+      Status status = io_mgr_->GetNextRange(client->reader.get(), &range);
+      CHECK(status.ok() || status.IsCancelled());
+      if (range == NULL) break;
+
+      while (true) {
+        unique_ptr<BufferDescriptor> buffer;
+        status = range->GetNext(&buffer);
+        CHECK(status.ok() || status.IsCancelled());
+        if (buffer == NULL) break;
+
+        int64_t scan_range_offset = buffer->scan_range_offset();
+        int len = buffer->len();
+        CHECK_GE(scan_range_offset, 0);
+        CHECK_LT(scan_range_offset, expected.size());
+        CHECK_GT(len, 0);
+
+        // We get scan ranges back in arbitrary order so the scan range to the 
file
+        // offset.
+        int64_t file_offset = scan_range_offset + range->offset();
+
+        // Validate the bytes read
+        CHECK_LE(file_offset + len, expected.size());
+        CHECK_EQ(strncmp(reinterpret_cast<char*>(buffer->buffer()),
+                     &expected.c_str()[file_offset], len), 0);
+
+        // Copy the bytes from this read into the result buffer.
+        memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
+        io_mgr_->ReturnBuffer(move(buffer));
+        bytes_read += len;
+
+        CHECK_GE(bytes_read, 0);
+        CHECK_LE(bytes_read, expected.size());
+
+        if (bytes_read > client->abort_at_byte) {
+          eos = true;
+          break;
+        }
+      } // End of buffer
+    } // End of scan range
+
+    if (bytes_read == expected.size()) {
+      // This entire file was read without being cancelled, validate the 
entire result
+      CHECK(status.ok());
+      CHECK_EQ(strncmp(read_buffer, expected.c_str(), bytes_read), 0);
+    }
+
+    // Unregister the old client and get a new one
+    unique_lock<mutex> lock(client->lock);
+    io_mgr_->UnregisterContext(client->reader.get());
+    NewClient(client_id);
+  }
+
+  unique_lock<mutex> lock(client->lock);
+  io_mgr_->UnregisterContext(client->reader.get());
+  client->reader = NULL;
+}
+
+// Cancel a random reader
+void DiskIoMgrStress::CancelRandomReader() {
+  if (!includes_cancellation_) return;
+
+  int rand_client = rand() % num_clients_;
+
+  unique_lock<mutex> lock(clients_[rand_client].lock);
+  io_mgr_->CancelContext(clients_[rand_client].reader.get());
+}
+
+void DiskIoMgrStress::Run(int sec) {
+  shutdown_ = false;
+  for (int i = 0; i < num_clients_; ++i) {
+    readers_.add_thread(
+        new thread(&DiskIoMgrStress::ClientThread, this, i));
+  }
+
+  // Sleep and let the clients do their thing for 'sec'
+  for (int loop_count = 1; sec == 0 || loop_count <= sec; ++loop_count) {
+    int iter = (1000) / CANCEL_READER_PERIOD_MS;
+    for (int i = 0; i < iter; ++i) {
+      SleepForMs(CANCEL_READER_PERIOD_MS);
+      CancelRandomReader();
+    }
+    LOG(ERROR) << "Finished iteration: " << loop_count;
+  }
+
+  // Signal shutdown for the client threads
+  shutdown_ = true;
+
+  for (int i = 0; i < num_clients_; ++i) {
+    unique_lock<mutex> lock(clients_[i].lock);
+    if (clients_[i].reader != NULL) 
io_mgr_->CancelContext(clients_[i].reader.get());
+  }
+
+  readers_.join_all();
+}
+
+// Initialize a client to read one of the files at random.  The scan ranges are
+// assigned randomly.
+void DiskIoMgrStress::NewClient(int i) {
+  Client& client = clients_[i];
+  ++client.files_processed;
+  client.file_idx = rand() % files_.size();
+  int file_len = files_[client.file_idx].data.size();
+
+  client.abort_at_byte = file_len;
+
+  if (includes_cancellation_) {
+    float rand_value = rand() / (float)RAND_MAX;
+    if (rand_value < ABORT_CHANCE) {
+      // Abort at a random byte inside the file
+      client.abort_at_byte = rand() % file_len;
+    }
+  }
+
+  for (int i = 0; i < client.scan_ranges.size(); ++i) {
+    delete client.scan_ranges[i];
+  }
+  client.scan_ranges.clear();
+
+  int assigned_len = 0;
+  while (assigned_len < file_len) {
+    int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
+    range_len = min(range_len, file_len - assigned_len);
+
+    ScanRange* range = new ScanRange();
+    range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, 
assigned_len,
+        0, false, BufferOpts::Uncached());
+    client.scan_ranges.push_back(range);
+    assigned_len += range_len;
+  }
+
+  client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
+  client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
+  Status status = io_mgr_->AddScanRanges(client.reader.get(), 
client.scan_ranges);
+  CHECK(status.ok());
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h 
b/be/src/runtime/io/disk-io-mgr-stress.h
new file mode 100644
index 0000000..b872694
--- /dev/null
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H
+#define IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H
+
+#include <memory>
+#include <vector>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
+
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/thread-resource-mgr.h"
+
+namespace impala {
+namespace io {
+
+/// Test utility to stress the disk io mgr.  It allows for a configurable
+/// number of clients.  The clients continuously issue work to the io mgr and
+/// asynchronously get cancelled.  The stress test can be run forever or for
+/// a fixed duration.  The unit test runs this for a fixed duration.
+class DiskIoMgrStress {
+ public:
+  DiskIoMgrStress(int num_disks, int num_threads_per_disk, int num_clients,
+      bool includes_cancellation);
+
+  /// Run the test for 'sec'.  If 0, run forever
+  void Run(int sec);
+
+ private:
+  struct Client;
+
+  struct File {
+    std::string filename;
+    std::string data;  // the data in the file, used to validate
+  };
+
+
+  /// Files used for testing.  These are created at startup and recycled
+  /// during the test
+  std::vector<File> files_;
+
+  /// Root mem tracker.
+  MemTracker mem_tracker_;
+
+  /// io manager
+  boost::scoped_ptr<DiskIoMgr> io_mgr_;
+
+  /// Thread group for reader threads
+  boost::thread_group readers_;
+
+  /// Array of clients
+  int num_clients_;
+  Client* clients_;
+
+  /// Client MemTrackers, one per client.
+  std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
+
+  /// If true, tests cancelling readers
+  bool includes_cancellation_;
+
+  /// Flag to signal that client reader threads should exit
+  volatile bool shutdown_;
+
+  /// Helper to initialize a new reader client, registering a new reader with 
the
+  /// io mgr and initializing the scan ranges
+  void NewClient(int i);
+
+  /// Thread running the reader.  When the current reader is done (either 
normally
+  /// or cancelled), it picks up a new reader
+  void ClientThread(int client_id);
+
+  /// Possibly cancels a random reader.
+  void CancelRandomReader();
+};
+}
+}
+
+#endif

Reply via email to