http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h deleted file mode 100644 index 49de0ff..0000000 --- a/be/src/runtime/disk-io-mgr.h +++ /dev/null @@ -1,972 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_RUNTIME_DISK_IO_MGR_H -#define IMPALA_RUNTIME_DISK_IO_MGR_H - -#include <deque> -#include <functional> -#include <vector> - -#include <boost/scoped_ptr.hpp> -#include <boost/unordered_set.hpp> -#include <boost/thread/mutex.hpp> - -#include "common/atomic.h" -#include "common/hdfs.h" -#include "common/object-pool.h" -#include "common/status.h" -#include "runtime/disk-io-mgr-handle-cache.h" -#include "runtime/thread-resource-mgr.h" -#include "util/aligned-new.h" -#include "util/bit-util.h" -#include "util/condition-variable.h" -#include "util/error-util.h" -#include "util/internal-queue.h" -#include "util/runtime-profile.h" -#include "util/thread.h" - -namespace impala { - -class MemTracker; - -/// Manager object that schedules IO for all queries on all disks and remote filesystems -/// (such as S3). Each query maps to one or more DiskIoRequestContext objects, each of which -/// has its own queue of scan ranges and/or write ranges. -// -/// The API splits up requesting scan/write ranges (non-blocking) and reading the data -/// (blocking). The DiskIoMgr has worker threads that will read from and write to -/// disk/hdfs/remote-filesystems, allowing interleaving of IO and CPU. This allows us to -/// keep all disks and all cores as busy as possible. -// -/// All public APIs are thread-safe. It is not valid to call any of the APIs after -/// UnregisterContext() returns. -// -/// For Readers: -/// We can model this problem as a multiple producer (threads for each disk), multiple -/// consumer (scan ranges) problem. There are multiple queues that need to be -/// synchronized. Conceptually, there are two queues: -/// 1. The per disk queue: this contains a queue of readers that need reads. -/// 2. The per scan range ready-buffer queue: this contains buffers that have been -/// read and are ready for the caller. -/// The disk queue contains a queue of readers and is scheduled in a round robin fashion. -/// Readers map to scan nodes. The reader then contains a queue of scan ranges. The caller -/// asks the IoMgr for the next range to process. The IoMgr then selects the best range -/// to read based on disk activity and begins reading and queuing buffers for that range. -/// TODO: We should map readers to queries. A reader is the unit of scheduling and queries -/// that have multiple scan nodes shouldn't have more 'turns'. -// -/// For Writers: -/// Data is written via AddWriteRange(). This is non-blocking and adds a WriteRange to a -/// per-disk queue. After the write is complete, a callback in WriteRange is invoked. -/// No memory is allocated within IoMgr for writes and no copies are made. It is the -/// responsibility of the client to ensure that the data to be written is valid and that -/// the file to be written to exists until the callback is invoked. -// -/// The IoMgr provides three key APIs. -/// 1. AddScanRanges: this is non-blocking and tells the IoMgr all the ranges that -/// will eventually need to be read. -/// 2. GetNextRange: returns to the caller the next scan range it should process. -/// This is based on disk load. This also begins reading the data in this scan -/// range. This is blocking. -/// 3. ScanRange::GetNext: returns the next buffer for this range. This is blocking. -// -/// The disk threads do not synchronize with each other. The readers and writers don't -/// synchronize with each other. There is a lock and condition variable for each request -/// context queue and each disk queue. -/// IMPORTANT: whenever both locks are needed, the lock order is to grab the context lock -/// before the disk lock. -// -/// Scheduling: If there are multiple request contexts with work for a single disk, the -/// request contexts are scheduled in round-robin order. Multiple disk threads can -/// operate on the same request context. Exactly one request range is processed by a -/// disk thread at a time. If there are multiple scan ranges scheduled via -/// GetNextRange() for a single context, these are processed in round-robin order. -/// If there are multiple scan and write ranges for a disk, a read is always followed -/// by a write, and a write is followed by a read, i.e. reads and writes alternate. -/// If multiple write ranges are enqueued for a single disk, they will be processed -/// by the disk threads in order, but may complete in any order. No guarantees are made -/// on ordering of writes across disks. -// -/// Resource Management: effective resource management in the IoMgr is key to good -/// performance. The IoMgr helps coordinate two resources: CPU and disk. For CPU, -/// spinning up too many threads causes thrashing. -/// Memory usage in the IoMgr comes from queued read buffers. If we queue the minimum -/// (i.e. 1), then the disks are idle while we are processing the buffer. If we don't -/// limit the queue, then it possible we end up queueing the entire data set (i.e. CPU -/// is slower than disks) and run out of memory. -/// For both CPU and memory, we want to model the machine as having a fixed amount of -/// resources. If a single query is running, it should saturate either CPU or Disk -/// as well as using as little memory as possible. With multiple queries, each query -/// should get less CPU. In that case each query will need fewer queued buffers and -/// therefore have less memory usage. -// -/// The IoMgr defers CPU management to the caller. The IoMgr provides a GetNextRange -/// API which will return the next scan range the caller should process. The caller -/// can call this from the desired number of reading threads. Once a scan range -/// has been returned via GetNextRange, the IoMgr will start to buffer reads for -/// that range and it is expected the caller will pull those buffers promptly. For -/// example, if the caller would like to have 1 scanner thread, the read loop -/// would look like: -/// while (more_ranges) -/// range = GetNextRange() -/// while (!range.eosr) -/// buffer = range.GetNext() -/// To have multiple reading threads, the caller would simply spin up the threads -/// and each would process the loops above. -// -/// To control the number of IO buffers, each scan range has a limit of two queued -/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at capacity, -/// the IoMgr will no longer read for that scan range until the caller has processed -/// a buffer. Assuming the client returns each buffer before requesting the next one -/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O buffers per -/// scan range. -// -/// Buffer Management: -/// Buffers for reads are either a) allocated by the IoMgr and transferred to the caller, -/// b) cached HDFS buffers if the scan range uses HDFS caching, or c) provided by the -/// caller when constructing the scan range. -/// -/// As a caller reads from a scan range, these buffers are wrapped in BufferDescriptors -/// and returned to the caller. The caller must always call ReturnBuffer() on the buffer -/// descriptor to allow recycling of the associated buffer (if there is an -/// IoMgr-allocated or HDFS cached buffer). -/// -/// Caching support: -/// Scan ranges contain metadata on whether or not it is cached on the DN. In that -/// case, we use the HDFS APIs to read the cached data without doing any copies. For these -/// ranges, the reads happen on the caller thread (as opposed to the disk threads). -/// It is possible for the cached read APIs to fail, in which case the ranges are then -/// queued on the disk threads and behave identically to the case where the range -/// is not cached. -/// Resources for these ranges are also not accounted against the reader because none -/// are consumed. -/// While a cached block is being processed, the block is mlocked. We want to minimize -/// the time the mlock is held. -/// - HDFS will time us out if we hold onto the mlock for too long -/// - Holding the lock prevents uncaching this file due to a caching policy change. -/// Therefore, we only issue the cached read when the caller is ready to process the -/// range (GetNextRange()) instead of when the ranges are issued. This guarantees that -/// there will be a CPU available to process the buffer and any throttling we do with -/// the number of scanner threads properly controls the amount of files we mlock. -/// With cached scan ranges, we cannot close the scan range until the cached buffer -/// is returned (HDFS does not allow this). We therefore need to defer the close until -/// the cached buffer is returned (ReturnBuffer()). -// -/// Remote filesystem support (e.g. S3): -/// Remote filesystems are modeled as "remote disks". That is, there is a seperate disk -/// queue for each supported remote filesystem type. In order to maximize throughput, -/// multiple connections are opened in parallel by having multiple threads running per -/// queue. Also note that reading from a remote filesystem service can be more CPU -/// intensive than local disk/hdfs because of non-direct I/O and SSL processing, and can -/// be CPU bottlenecked especially if not enough I/O threads for these queues are -/// started. -// -/// TODO: IoMgr should be able to request additional scan ranges from the coordinator -/// to help deal with stragglers. -/// TODO: look into using a lock free queue -/// TODO: simplify the common path (less locking, memory allocations). -/// TODO: Break this up the .h/.cc into multiple files under an /io subdirectory. -// -/// Structure of the Implementation: -/// - All client APIs are defined in this file -/// - Internal classes are defined in disk-io-mgr-internal.h -/// - ScanRange APIs are implemented in disk-io-mgr-scan-range.cc -/// This contains the ready buffer queue logic -/// - DiskIoRequestContext APIs are implemented in disk-io-mgr-reader-context.cc -/// This contains the logic for picking scan ranges for a reader. -/// - Disk Thread and general APIs are implemented in disk-io-mgr.cc. - -class DiskIoRequestContext; - -// This is cache line aligned because the FileHandleCache needs cache line alignment -// for its partitions. -class DiskIoMgr : public CacheLineAligned { - public: - class ScanRange; - - /// Buffer struct that is used by the caller and IoMgr to pass read buffers. - /// It is is expected that only one thread has ownership of this object at a - /// time. - class BufferDescriptor { - public: - ~BufferDescriptor() { - DCHECK(buffer_ == nullptr); // Check we didn't leak a buffer. - } - - ScanRange* scan_range() { return scan_range_; } - uint8_t* buffer() { return buffer_; } - int64_t buffer_len() { return buffer_len_; } - int64_t len() { return len_; } - bool eosr() { return eosr_; } - - /// Returns the offset within the scan range that this buffer starts at - int64_t scan_range_offset() const { return scan_range_offset_; } - - /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and set - /// 'mem_tracker_' to 'dst'. 'mem_tracker_' and 'dst' must be non-NULL. Does not - /// check memory limits on 'dst': the caller should check the memory limit if a - /// different memory limit may apply to 'dst'. If the buffer was a client-provided - /// buffer, transferring is not allowed. - /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp. - void TransferOwnership(MemTracker* dst); - - private: - friend class DiskIoMgr; - friend class DiskIoMgr::ScanRange; - friend class DiskIoRequestContext; - - /// Create a buffer descriptor for a new reader, range and data buffer. The buffer - /// memory should already be accounted against 'mem_tracker'. - BufferDescriptor(DiskIoMgr* io_mgr, DiskIoRequestContext* reader, - ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len, - MemTracker* mem_tracker); - - /// Return true if this is a cached buffer owned by HDFS. - bool is_cached() const { - return scan_range_->external_buffer_tag_ - == ScanRange::ExternalBufferTag::CACHED_BUFFER; - } - - /// Return true if this is a buffer owner by the client that was provided when - /// constructing the scan range. - bool is_client_buffer() const { - return scan_range_->external_buffer_tag_ - == ScanRange::ExternalBufferTag::CLIENT_BUFFER; - } - - DiskIoMgr* const io_mgr_; - - /// Reader that this buffer is for. - DiskIoRequestContext* const reader_; - - /// The current tracker this buffer is associated with. After initialisation, - /// NULL for cached buffers and non-NULL for all other buffers. - MemTracker* mem_tracker_; - - /// Scan range that this buffer is for. Non-NULL when initialised. - ScanRange* const scan_range_; - - /// buffer with the read contents - uint8_t* buffer_; - - /// length of buffer_. For buffers from cached reads, the length is 0. - const int64_t buffer_len_; - - /// length of read contents - int64_t len_ = 0; - - /// true if the current scan range is complete - bool eosr_ = false; - - /// Status of the read to this buffer. if status is not ok, 'buffer' is nullptr - Status status_; - - int64_t scan_range_offset_ = 0; - }; - - /// The request type, read or write associated with a request range. - struct RequestType { - enum type { - READ, - WRITE, - }; - }; - - /// Represents a contiguous sequence of bytes in a single file. - /// This is the common base class for read and write IO requests - ScanRange and - /// WriteRange. Each disk thread processes exactly one RequestRange at a time. - class RequestRange : public InternalQueue<RequestRange>::Node { - public: - hdfsFS fs() const { return fs_; } - const char* file() const { return file_.c_str(); } - std::string* file_string() { return &file_; } - int64_t offset() const { return offset_; } - int64_t len() const { return len_; } - int disk_id() const { return disk_id_; } - RequestType::type request_type() const { return request_type_; } - - protected: - RequestRange(RequestType::type request_type) - : fs_(nullptr), offset_(-1), len_(-1), disk_id_(-1), request_type_(request_type) {} - - /// Hadoop filesystem that contains file_, or set to nullptr for local filesystem. - hdfsFS fs_; - - /// Path to file being read or written. - std::string file_; - - /// Offset within file_ being read or written. - int64_t offset_; - - /// Length of data read or written. - int64_t len_; - - /// Id of disk containing byte range. - int disk_id_; - - /// The type of IO request, READ or WRITE. - RequestType::type request_type_; - }; - - /// Param struct for different combinations of buffering. - struct BufferOpts { - public: - // Set options for a read into an IoMgr-allocated or HDFS-cached buffer. Caching is - // enabled if 'try_cache' is true, the file is in the HDFS cache and 'mtime' matches - // the modified time of the cached file in the HDFS cache. - BufferOpts(bool try_cache, int64_t mtime) - : try_cache_(try_cache), - mtime_(mtime), - client_buffer_(nullptr), - client_buffer_len_(-1) {} - - /// Set options for an uncached read into an IoMgr-allocated buffer. - static BufferOpts Uncached() { - return BufferOpts(false, NEVER_CACHE, nullptr, -1); - } - - /// Set options to read the entire scan range into 'client_buffer'. The length of the - /// buffer, 'client_buffer_len', must fit the entire scan range. HDFS caching is not - /// enabled in this case. - static BufferOpts ReadInto(uint8_t* client_buffer, int64_t client_buffer_len) { - return BufferOpts(false, NEVER_CACHE, client_buffer, client_buffer_len); - } - - private: - friend class ScanRange; - - BufferOpts( - bool try_cache, int64_t mtime, uint8_t* client_buffer, int64_t client_buffer_len) - : try_cache_(try_cache), - mtime_(mtime), - client_buffer_(client_buffer), - client_buffer_len_(client_buffer_len) {} - - /// If 'mtime_' is set to NEVER_CACHE, the file handle will never be cached, because - /// the modification time won't match. - const static int64_t NEVER_CACHE = -1; - - /// If true, read from HDFS cache if possible. - const bool try_cache_; - - /// Last modified time of the file associated with the scan range. If set to - /// NEVER_CACHE, caching is disabled. - const int64_t mtime_; - - /// A destination buffer provided by the client, nullptr and -1 if no buffer. - uint8_t* const client_buffer_; - const int64_t client_buffer_len_; - }; - - /// ScanRange description. The caller must call Reset() to initialize the fields - /// before calling AddScanRanges(). The private fields are used internally by - /// the IoMgr. - class ScanRange : public RequestRange { - public: - ScanRange(); - - virtual ~ScanRange(); - - /// Resets this scan range object with the scan range description. The scan range - /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the - /// local filesystem). The scan range must fall within the file bounds (offset >= 0 - /// and offset + len <= file_length). 'disk_id' is the disk queue to add the range - /// to. If 'expected_local' is true, a warning is generated if the read did not - /// come from a local disk. 'buffer_opts' specifies buffer management options - - /// see the DiskIoMgr class comment and the BufferOpts comments for details. - /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data. - void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, - bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr); - - void* meta_data() const { return meta_data_; } - bool try_cache() const { return try_cache_; } - bool expected_local() const { return expected_local_; } - - /// Returns the next buffer for this scan range. buffer is an output parameter. - /// This function blocks until a buffer is ready or an error occurred. If this is - /// called when all buffers have been returned, *buffer is set to nullptr and Status::OK - /// is returned. - /// Only one thread can be in GetNext() at any time. - Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; - - /// Cancel this scan range. This cleans up all queued buffers and - /// wakes up any threads blocked on GetNext(). - /// Status is the reason the range was cancelled. Must not be ok(). - /// Status is returned to the user in GetNext(). - void Cancel(const Status& status); - - /// return a descriptive string for debug. - std::string DebugString() const; - - int64_t mtime() const { return mtime_; } - - private: - friend class DiskIoMgr; - friend class DiskIoRequestContext; - - /// Initialize internal fields - void InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader); - - /// Enqueues a buffer for this range. This does not block. - /// Returns true if this scan range has hit the queue capacity, false otherwise. - /// The caller passes ownership of buffer to the scan range and it is not - /// valid to access buffer after this call. The reader lock must be held by the - /// caller. - bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock, - std::unique_ptr<BufferDescriptor> buffer); - - /// Cleanup any queued buffers (i.e. due to cancellation). This cannot - /// be called with any locks taken. - void CleanupQueuedBuffers(); - - /// Validates the internal state of this range. lock_ must be taken - /// before calling this. - bool Validate(); - - /// Maximum length in bytes for hdfsRead() calls. - int64_t MaxReadChunkSize() const; - - /// Opens the file for this range. This function only modifies state in this range. - /// If 'use_file_handle_cache' is true and this is a local hdfs file, then this scan - /// range will not maintain an exclusive file handle. It will borrow an hdfs file - /// handle from the file handle cache for each Read(), so Open() does nothing. - /// If 'use_file_handle_cache' is false or this is a remote hdfs file or this is - /// a local OS file, Open() will maintain a file handle on the scan range for - /// exclusive use by this scan range. An exclusive hdfs file handle still comes - /// from the cache, but it is a newly opened file handle that is held for the - /// entire duration of a scan range's lifetime and destroyed in Close(). - /// All local OS files are opened using normal OS file APIs. - Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT; - - /// Closes the file for this range. This function only modifies state in this range. - void Close(); - - /// Reads from this range into 'buffer', which has length 'buffer_len' bytes. Returns - /// the number of bytes read. The read position in this scan range is updated. - Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, - bool* eosr) WARN_UNUSED_RESULT; - - /// Get the read statistics from the Hdfs file handle and aggregate them to - /// the DiskIoRequestContext. This clears the statistics on this file handle. - /// It is safe to pass hdfsFile by value, as hdfsFile's underlying type is a - /// pointer. - void GetHdfsStatistics(hdfsFile fh); - - /// Reads from the DN cache. On success, sets cached_buffer_ to the DN buffer - /// and *read_succeeded to true. - /// If the data is not cached, returns ok() and *read_succeeded is set to false. - /// Returns a non-ok status if it ran into a non-continuable error. - /// The reader lock must be held by the caller. - Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock, - bool* read_succeeded) WARN_UNUSED_RESULT; - - /// Pointer to caller specified metadata. This is untouched by the io manager - /// and the caller can put whatever auxiliary data in here. - void* meta_data_ = nullptr; - - /// If true, this scan range is expected to be cached. Note that this might be wrong - /// since the block could have been uncached. In that case, the cached path - /// will fail and we'll just put the scan range on the normal read path. - bool try_cache_ = false; - - /// If true, we expect this scan range to be a local read. Note that if this is false, - /// it does not necessarily mean we expect the read to be remote, and that we never - /// create scan ranges where some of the range is expected to be remote and some of it - /// local. - /// TODO: we can do more with this - bool expected_local_ = false; - - /// Total number of bytes read remotely. This is necessary to maintain a count of - /// the number of remote scan ranges. Since IO statistics can be collected multiple - /// times for a scan range, it is necessary to keep some state about whether this - /// scan range has already been counted as remote. There is also a requirement to - /// log the number of unexpected remote bytes for a scan range. To solve both - /// requirements, maintain num_remote_bytes_ on the ScanRange and push it to the - /// reader_ once at the close of the scan range. - int64_t num_remote_bytes_; - - DiskIoMgr* io_mgr_ = nullptr; - - /// Reader/owner of the scan range - DiskIoRequestContext* reader_ = nullptr; - - /// File handle either to hdfs or local fs (FILE*) - /// The hdfs file handle is only stored here in three cases: - /// 1. The file handle cache is off (max_cached_file_handles == 0). - /// 2. The scan range is using hdfs caching. - /// -OR- - /// 3. The hdfs file is expected to be remote (expected_local_ == false) - /// In each case, the scan range gets a new file handle from the file handle cache - /// at Open(), holds it exclusively, and destroys it in Close(). - union { - FILE* local_file_ = nullptr; - HdfsFileHandle* exclusive_hdfs_fh_; - }; - - /// Tagged union that holds a buffer for the cases when there is a buffer allocated - /// externally from DiskIoMgr that is associated with the scan range. - enum class ExternalBufferTag { CLIENT_BUFFER, CACHED_BUFFER, NO_BUFFER }; - ExternalBufferTag external_buffer_tag_; - union { - /// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER. - struct { - /// Client-provided buffer to read the whole scan range into. - uint8_t* data; - - /// Length of the client-provided buffer. - int64_t len; - } client_buffer_; - - /// Valid and non-NULL if the external_buffer_tag_ is CACHED_BUFFER, which means - /// that a cached read succeeded and all the bytes for the range are in this buffer. - struct hadoopRzBuffer* cached_buffer_ = nullptr; - }; - - /// Lock protecting fields below. - /// This lock should not be taken during Open()/Read()/Close(). - /// If DiskIoRequestContext::lock_ and this lock need to be held simultaneously, - /// DiskIoRequestContext::lock_ must be taken first. - boost::mutex lock_; - - /// Number of bytes read so far for this scan range - int bytes_read_; - - /// Status for this range. This is non-ok if is_cancelled_ is true. - /// Note: an individual range can fail without the DiskIoRequestContext being - /// cancelled. This allows us to skip individual ranges. - Status status_; - - /// If true, the last buffer for this scan range has been queued. - bool eosr_queued_ = false; - - /// If true, the last buffer for this scan range has been returned. - bool eosr_returned_ = false; - - /// If true, this scan range has been removed from the reader's in_flight_ranges - /// queue because the ready_buffers_ queue is full. - bool blocked_on_queue_ = false; - - /// IO buffers that are queued for this scan range. - /// Condition variable for GetNext - ConditionVariable buffer_ready_cv_; - std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_; - - /// Lock that should be taken during hdfs calls. Only one thread (the disk reading - /// thread) calls into hdfs at a time so this lock does not have performance impact. - /// This lock only serves to coordinate cleanup. Specifically it serves to ensure - /// that the disk threads are finished with HDFS calls before is_cancelled_ is set - /// to true and cleanup starts. - /// If this lock and lock_ need to be taken, lock_ must be taken first. - boost::mutex hdfs_lock_; - - /// If true, this scan range has been cancelled. - bool is_cancelled_ = false; - - /// Last modified time of the file associated with the scan range - int64_t mtime_; - }; - - /// Used to specify data to be written to a file and offset. - /// It is the responsibility of the client to ensure that the data to be written is - /// valid and that the file to be written to exists until the callback is invoked. - /// A callback is invoked to inform the client when the write is done. - class WriteRange : public RequestRange { - public: - /// This callback is invoked on each WriteRange after the write is complete or the - /// context is cancelled. The status returned by the callback parameter indicates - /// if the write was successful (i.e. Status::OK), if there was an error - /// TStatusCode::RUNTIME_ERROR) or if the context was cancelled - /// (TStatusCode::CANCELLED). The callback is only invoked if this WriteRange was - /// successfully added (i.e. AddWriteRange() succeeded). No locks are held while - /// the callback is invoked. - typedef std::function<void(const Status&)> WriteDoneCallback; - WriteRange(const std::string& file, int64_t file_offset, int disk_id, - WriteDoneCallback callback); - - /// Change the file and offset of this write range. Data and callbacks are unchanged. - /// Can only be called when the write is not in flight (i.e. before AddWriteRange() - /// is called or after the write callback was called). - void SetRange(const std::string& file, int64_t file_offset, int disk_id); - - /// Set the data and number of bytes to be written for this WriteRange. - /// Can only be called when the write is not in flight (i.e. before AddWriteRange() - /// is called or after the write callback was called). - void SetData(const uint8_t* buffer, int64_t len); - - const uint8_t* data() const { return data_; } - - private: - friend class DiskIoMgr; - friend class DiskIoRequestContext; - - /// Data to be written. RequestRange::len_ contains the length of data - /// to be written. - const uint8_t* data_; - - /// Callback to invoke after the write is complete. - WriteDoneCallback callback_; - }; - - /// Create a DiskIoMgr object. This constructor is only used for testing. - /// - num_disks: The number of disks the IoMgr should use. This is used for testing. - /// Specify 0, to have the disk IoMgr query the os for the number of disks. - /// - threads_per_rotational_disk: number of read threads to create per rotational - /// disk. This is also the max queue depth. - /// - threads_per_solid_state_disk: number of read threads to create per solid state - /// disk. This is also the max queue depth. - /// - min_buffer_size: minimum io buffer size (in bytes) - /// - max_buffer_size: maximum io buffer size (in bytes). Also the max read size. - DiskIoMgr(int num_disks, int threads_per_rotational_disk, - int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size); - - /// Create DiskIoMgr with default configs. - DiskIoMgr(); - - /// Clean up all threads and resources. This is mostly useful for testing since - /// for impalad, this object is never destroyed. - ~DiskIoMgr(); - - /// Initialize the IoMgr. Must be called once before any of the other APIs. - Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT; - - /// Allocates tracking structure for a request context. - /// Register a new request context and return it to the caller. The caller must call - /// UnregisterContext() for each context. - /// reader_mem_tracker: Is non-null only for readers. IO buffers - /// used for this reader will be tracked by this. If the limit is exceeded - /// the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via - /// GetNext(). - std::unique_ptr<DiskIoRequestContext> RegisterContext(MemTracker* reader_mem_tracker); - - /// Unregisters context from the disk IoMgr by first cancelling it then blocking until - /// all references to the context are removed from I/O manager internal data structures. - /// This must be called for every RegisterContext() to ensure that the context object - /// can be safely destroyed. It is invalid to add more request ranges to 'context' after - /// after this call. This call blocks until all the disk threads have finished cleaning - /// up. - void UnregisterContext(DiskIoRequestContext* context); - - /// This function cancels the context asychronously. All outstanding requests - /// are aborted and tracking structures cleaned up. This does not need to be - /// called if the context finishes normally. - /// This will also fail any outstanding GetNext()/Read requests. - void CancelContext(DiskIoRequestContext* context); - - /// Adds the scan ranges to the queues. This call is non-blocking. The caller must - /// not deallocate the scan range pointers before UnregisterContext(). - /// If schedule_immediately, the ranges are immediately put on the read queue - /// (i.e. the caller should not/cannot call GetNextRange for these ranges). - /// This can be used to do synchronous reads as well as schedule dependent ranges, - /// as in the case for columnar formats. - Status AddScanRanges(DiskIoRequestContext* reader, - const std::vector<ScanRange*>& ranges, - bool schedule_immediately = false) WARN_UNUSED_RESULT; - Status AddScanRange(DiskIoRequestContext* reader, ScanRange* range, - bool schedule_immediately = false) WARN_UNUSED_RESULT; - - /// Add a WriteRange for the writer. This is non-blocking and schedules the context - /// on the IoMgr disk queue. Does not create any files. - Status AddWriteRange( - DiskIoRequestContext* writer, WriteRange* write_range) WARN_UNUSED_RESULT; - - /// Returns the next unstarted scan range for this reader. When the range is returned, - /// the disk threads in the IoMgr will already have started reading from it. The - /// caller is expected to call ScanRange::GetNext on the returned range. - /// If there are no more unstarted ranges, nullptr is returned. - /// This call is blocking. - Status GetNextRange(DiskIoRequestContext* reader, ScanRange** range) WARN_UNUSED_RESULT; - - /// Reads the range and returns the result in buffer. - /// This behaves like the typical synchronous read() api, blocking until the data - /// is read. This can be called while there are outstanding ScanRanges and is - /// thread safe. Multiple threads can be calling Read() per reader at a time. - /// range *cannot* have already been added via AddScanRanges. - /// This can only be used if the scan range fits in a single IO buffer (i.e. is smaller - /// than max_read_buffer_size()) or if reading into a client-provided buffer. - Status Read(DiskIoRequestContext* reader, ScanRange* range, - std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; - - /// Returns the buffer to the IoMgr. This must be called for every buffer - /// returned by GetNext()/Read() that did not return an error. This is non-blocking. - /// After calling this, the buffer descriptor is invalid and cannot be accessed. - void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer); - - /// Determine which disk queue this file should be assigned to. Returns an index into - /// disk_queues_. The disk_id is the volume ID for the local disk that holds the - /// files, or -1 if unknown. Flag expected_local is true iff this impalad is - /// co-located with the datanode for this file. - int AssignQueue(const char* file, int disk_id, bool expected_local); - - /// TODO: The functions below can be moved to DiskIoRequestContext. - /// Returns the current status of the context. - Status context_status(DiskIoRequestContext* context) const WARN_UNUSED_RESULT; - - void set_bytes_read_counter(DiskIoRequestContext*, RuntimeProfile::Counter*); - void set_read_timer(DiskIoRequestContext*, RuntimeProfile::Counter*); - void set_active_read_thread_counter(DiskIoRequestContext*, RuntimeProfile::Counter*); - void set_disks_access_bitmap(DiskIoRequestContext*, RuntimeProfile::Counter*); - - int64_t queue_size(DiskIoRequestContext* reader) const; - int64_t bytes_read_local(DiskIoRequestContext* reader) const; - int64_t bytes_read_short_circuit(DiskIoRequestContext* reader) const; - int64_t bytes_read_dn_cache(DiskIoRequestContext* reader) const; - int num_remote_ranges(DiskIoRequestContext* reader) const; - int64_t unexpected_remote_bytes(DiskIoRequestContext* reader) const; - int cached_file_handles_hit_count(DiskIoRequestContext* reader) const; - int cached_file_handles_miss_count(DiskIoRequestContext* reader) const; - - /// Returns the read throughput across all readers. - /// TODO: should this be a sliding window? This should report metrics for the - /// last minute, hour and since the beginning. - int64_t GetReadThroughput(); - - /// Returns the maximum read buffer size - int max_read_buffer_size() const { return max_buffer_size_; } - - /// Returns the total number of disk queues (both local and remote). - int num_total_disks() const { return disk_queues_.size(); } - - /// Returns the total number of remote "disk" queues. - int num_remote_disks() const { return REMOTE_NUM_DISKS; } - - /// Returns the number of local disks attached to the system. - int num_local_disks() const { return num_total_disks() - num_remote_disks(); } - - /// The disk ID (and therefore disk_queues_ index) used for DFS accesses. - int RemoteDfsDiskId() const { return num_local_disks() + REMOTE_DFS_DISK_OFFSET; } - - /// The disk ID (and therefore disk_queues_ index) used for S3 accesses. - int RemoteS3DiskId() const { return num_local_disks() + REMOTE_S3_DISK_OFFSET; } - - /// The disk ID (and therefore disk_queues_ index) used for ADLS accesses. - int RemoteAdlsDiskId() const { return num_local_disks() + REMOTE_ADLS_DISK_OFFSET; } - - /// Dumps the disk IoMgr queues (for readers and disks) - std::string DebugString(); - - /// Validates the internal state is consistent. This is intended to only be used - /// for debugging. - bool Validate() const; - - /// Given a FS handle, name and last modified time of the file, gets an HdfsFileHandle - /// from the file handle cache. If 'require_new_handle' is true, the cache will open - /// a fresh file handle. On success, records statistics about whether this was - /// a cache hit or miss in the 'reader' as well as at the system level. In case of an - /// error returns nullptr. - HdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs, - std::string* fname, int64_t mtime, DiskIoRequestContext *reader, - bool require_new_handle); - - /// Releases a file handle back to the file handle cache when it is no longer in use. - /// If 'destroy_handle' is true, the file handle cache will close the file handle - /// immediately. - void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid, - bool destroy_handle); - - /// Reopens a file handle by destroying the file handle and getting a fresh - /// file handle from the cache. Returns an error if the file could not be reopened. - Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime, - HdfsFileHandle** fid); - - /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the buffers if - /// 'bytes_to_free' is -1. - void GcIoBuffers(int64_t bytes_to_free = -1); - - /// The maximum number of ready buffers that can be queued in a scan range. Having two - /// queued buffers (plus the buffer that is returned to the client) gives good - /// performance in most scenarios: - /// 1. If the consumer is consuming data faster than we can read from disk, then the - /// queue will be empty most of the time because the buffer will be immediately - /// pulled off the queue as soon as it is added. There will always be an I/O request - /// in the disk queue to maximize I/O throughput, which is the bottleneck in this - /// case. - /// 2. If we can read from disk faster than the consumer is consuming data, the queue - /// will fill up and there will always be a buffer available for the consumer to - /// read, so the consumer will not block and we maximize consumer throughput, which - /// is the bottleneck in this case. - /// 3. If the consumer is consuming data at approximately the same rate as we are - /// reading from disk, then the steady state is that the consumer is processing one - /// buffer and one buffer is in the disk queue. The additional buffer can absorb - /// bursts where the producer runs faster than the consumer or the consumer runs - /// faster than the producer without blocking either the producer or consumer. - static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2; - - /// "Disk" queue offsets for remote accesses. Offset 0 corresponds to - /// disk ID (i.e. disk_queue_ index) of num_local_disks(). - enum { - REMOTE_DFS_DISK_OFFSET = 0, - REMOTE_S3_DISK_OFFSET, - REMOTE_ADLS_DISK_OFFSET, - REMOTE_NUM_DISKS - }; - - private: - friend class BufferDescriptor; - friend class DiskIoRequestContext; - struct DiskQueue; - - friend class DiskIoMgrTest_Buffers_Test; - friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test; - - /// Memory tracker for unused I/O buffers owned by DiskIoMgr. - boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_; - - /// Memory tracker for I/O buffers where the DiskIoRequestContext has no MemTracker. - /// TODO: once IMPALA-3200 is fixed, there should be no more cases where readers don't - /// provide a MemTracker. - boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_; - - /// Number of worker(read) threads per rotational disk. Also the max depth of queued - /// work to the disk. - const int num_io_threads_per_rotational_disk_; - - /// Number of worker(read) threads per solid state disk. Also the max depth of queued - /// work to the disk. - const int num_io_threads_per_solid_state_disk_; - - /// Maximum read size. This is also the maximum size of each allocated buffer. - const int max_buffer_size_; - - /// The minimum size of each read buffer. - const int min_buffer_size_; - - /// Thread group containing all the worker threads. - ThreadGroup disk_thread_group_; - - /// Options object for cached hdfs reads. Set on startup and never modified. - struct hadoopRzOptions* cached_read_options_ = nullptr; - - /// True if the IoMgr should be torn down. Worker threads watch for this to - /// know to terminate. This variable is read/written to by different threads. - volatile bool shut_down_; - - /// Total bytes read by the IoMgr. - RuntimeProfile::Counter total_bytes_read_counter_; - - /// Total time spent in hdfs reading - RuntimeProfile::Counter read_timer_; - - /// Protects free_buffers_ - boost::mutex free_buffers_lock_; - - /// Free buffers that can be handed out to clients. There is one list for each buffer - /// size, indexed by the Log2 of the buffer size in units of min_buffer_size_. The - /// maximum buffer size is max_buffer_size_, so the maximum index is - /// Log2(max_buffer_size_ / min_buffer_size_). - // - /// E.g. if min_buffer_size_ = 1024 bytes: - /// free_buffers_[0] => list of free buffers with size 1024 B - /// free_buffers_[1] => list of free buffers with size 2048 B - /// free_buffers_[10] => list of free buffers with size 1 MB - /// free_buffers_[13] => list of free buffers with size 8 MB - /// free_buffers_[n] => list of free buffers with size 2^n * 1024 B - std::vector<std::deque<uint8_t*>> free_buffers_; - - /// Total number of allocated buffers, used for debugging. - AtomicInt32 num_allocated_buffers_; - - /// Total number of buffers in readers - AtomicInt32 num_buffers_in_readers_; - - /// Per disk queues. This is static and created once at Init() time. One queue is - /// allocated for each local disk on the system and for each remote filesystem type. - /// It is indexed by disk id. - std::vector<DiskQueue*> disk_queues_; - - /// The next disk queue to write to if the actual 'disk_id_' is unknown (i.e. the file - /// is not associated with a particular local disk or remote queue). Used to implement - /// round-robin assignment for that case. - static AtomicInt32 next_disk_id_; - - // Number of file handle cache partitions to use - static const size_t NUM_FILE_HANDLE_CACHE_PARTITIONS = 16; - - // Caching structure that maps file names to cached file handles. The cache has an upper - // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached file - // handles are closed. - FileHandleCache<NUM_FILE_HANDLE_CACHE_PARTITIONS> file_handle_cache_; - - /// Returns the index into free_buffers_ for a given buffer size - int free_buffers_idx(int64_t buffer_size); - - /// Returns a buffer to read into with size between 'buffer_size' and - /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the - /// 'free_buffers_', that is returned, otherwise a new one is allocated. - /// The returned *buffer_size must be between 0 and 'max_buffer_size_'. - /// The buffer memory is tracked against reader's mem tracker, or - /// 'unowned_buffer_mem_tracker_' if the reader does not have one. - std::unique_ptr<BufferDescriptor> GetFreeBuffer( - DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size); - - /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be nullptr), either - /// freeing it or returning it to 'free_buffers_'. Memory tracking is updated to - /// reflect the transfer of ownership from desc->mem_tracker_ to the disk I/O mgr. - void FreeBufferMemory(BufferDescriptor* desc); - - /// Disk worker thread loop. This function retrieves the next range to process on - /// the disk queue and invokes ReadRange() or Write() depending on the type of Range(). - /// There can be multiple threads per disk running this loop. - void WorkLoop(DiskQueue* queue); - - /// This is called from the disk thread to get the next range to process. It will - /// wait until a scan range and buffer are available, or a write range is available. - /// This functions returns the range to process. - /// Only returns false if the disk thread should be shut down. - /// No locks should be taken before this function call and none are left taken after. - bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, - DiskIoRequestContext** request_context); - - /// Updates disk queue and reader state after a read is complete. The read result - /// is captured in the buffer descriptor. - void HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* reader, - std::unique_ptr<BufferDescriptor> buffer); - - /// Invokes write_range->callback_ after the range has been written and - /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR - /// etc. is passed via write_status and to the callback. - /// The write_status does not affect the writer->status_. That is, an write error does - /// not cancel the writer context - that decision is left to the callback handler. - /// TODO: On the read path, consider not canceling the reader context on error. - void HandleWriteFinished( - DiskIoRequestContext* writer, WriteRange* write_range, const Status& write_status); - - /// Validates that range is correctly initialized - Status ValidateScanRange(ScanRange* range) WARN_UNUSED_RESULT; - - /// Write the specified range to disk and calls HandleWriteFinished when done. - /// Responsible for opening and closing the file that is written. - void Write(DiskIoRequestContext* writer_context, WriteRange* write_range); - - /// Helper method to write a range using the specified FILE handle. Returns Status:OK - /// if the write succeeded, or a RUNTIME_ERROR with an appropriate message otherwise. - /// Does not open or close the file that is written. - Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) WARN_UNUSED_RESULT; - - /// Reads the specified scan range and calls HandleReadFinished when done. - void ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range); - - /// Try to allocate the next buffer for the scan range, returning the new buffer - /// if successful. If 'reader' is cancelled, cancels the range and returns nullptr. - /// If there is memory pressure and buffers are already queued, adds the range - /// to the blocked ranges and returns nullptr. - std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* disk_queue, - DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size); -}; -} - -#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 999b56a..6fb572c 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -36,7 +36,7 @@ #include "runtime/client-cache.h" #include "runtime/coordinator.h" #include "runtime/data-stream-mgr.h" -#include "runtime/disk-io-mgr.h" +#include "runtime/io/disk-io-mgr.h" #include "runtime/hbase-table-factory.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/krpc-data-stream-mgr.h" @@ -164,7 +164,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port, FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "", !FLAGS_ssl_client_ca_certificate.empty())), htable_factory_(new HBaseTableFactory()), - disk_io_mgr_(new DiskIoMgr()), + disk_io_mgr_(new io::DiskIoMgr()), webserver_(new Webserver(webserver_port)), pool_mem_trackers_(new PoolMemTrackerRegistry), thread_mgr_(new ThreadResourceMgr), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index 8fafdc5..193fdde 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -43,7 +43,6 @@ class BufferPool; class CallableThreadPool; class DataStreamMgrBase; class DataStreamMgr; -class DiskIoMgr; class QueryExecMgr; class Frontend; class HBaseTableFactory; @@ -65,6 +64,10 @@ class ThreadResourceMgr; class TmpFileMgr; class Webserver; +namespace io { + class DiskIoMgr; +} + /// Execution environment for Impala daemon. Contains all required global structures, and /// handles to singleton services. Clients must call StartServices() exactly once to /// properly initialise service state. @@ -116,7 +119,7 @@ class ExecEnv { return catalogd_client_cache_.get(); } HBaseTableFactory* htable_factory() { return htable_factory_.get(); } - DiskIoMgr* disk_io_mgr() { return disk_io_mgr_.get(); } + io::DiskIoMgr* disk_io_mgr() { return disk_io_mgr_.get(); } Webserver* webserver() { return webserver_.get(); } MetricGroup* metrics() { return metrics_.get(); } MemTracker* process_mem_tracker() { return mem_tracker_.get(); } @@ -174,7 +177,7 @@ class ExecEnv { boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_; boost::scoped_ptr<CatalogServiceClientCache> catalogd_client_cache_; boost::scoped_ptr<HBaseTableFactory> htable_factory_; - boost::scoped_ptr<DiskIoMgr> disk_io_mgr_; + boost::scoped_ptr<io::DiskIoMgr> disk_io_mgr_; boost::scoped_ptr<Webserver> webserver_; boost::scoped_ptr<MemTracker> mem_tracker_; boost::scoped_ptr<PoolMemTrackerRegistry> pool_mem_trackers_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt new file mode 100644 index 0000000..ae89509 --- /dev/null +++ b/be/src/runtime/io/CMakeLists.txt @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# where to put generated libraries +set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/io") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/io") + +add_library(Io + disk-io-mgr.cc + disk-io-mgr-stress.cc + request-context.cc + scan-range.cc +) +add_dependencies(Io gen-deps) + +# This test runs forever so should not be part of 'make test' +add_executable(disk-io-mgr-stress-test disk-io-mgr-stress-test.cc) +target_link_libraries(disk-io-mgr-stress-test ${IMPALA_TEST_LINK_LIBS}) + +ADD_BE_TEST(disk-io-mgr-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h new file mode 100644 index 0000000..3fc3895 --- /dev/null +++ b/be/src/runtime/io/disk-io-mgr-internal.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H +#define IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H + +#include <unistd.h> +#include <queue> +#include <boost/thread/locks.hpp> +#include <gutil/strings/substitute.h> + +#include "common/logging.h" +#include "runtime/io/request-context.h" +#include "runtime/io/disk-io-mgr.h" +#include "runtime/mem-tracker.h" +#include "runtime/thread-resource-mgr.h" +#include "util/condition-variable.h" +#include "util/cpu-info.h" +#include "util/debug-util.h" +#include "util/disk-info.h" +#include "util/filesystem-util.h" +#include "util/hdfs-util.h" +#include "util/impalad-metrics.h" + +/// This file contains internal structures shared between submodules of the IoMgr. Users +/// of the IoMgr do not need to include this file. +namespace impala { +namespace io { + +/// Per disk state +struct DiskIoMgr::DiskQueue { + /// Disk id (0-based) + int disk_id; + + /// Lock that protects access to 'request_contexts' and 'work_available' + boost::mutex lock; + + /// Condition variable to signal the disk threads that there is work to do or the + /// thread should shut down. A disk thread will be woken up when there is a reader + /// added to the queue. A reader is only on the queue when it has at least one + /// scan range that is not blocked on available buffers. + ConditionVariable work_available; + + /// list of all request contexts that have work queued on this disk + std::list<RequestContext*> request_contexts; + + /// Enqueue the request context to the disk queue. The DiskQueue lock must not be taken. + inline void EnqueueContext(RequestContext* worker) { + { + boost::unique_lock<boost::mutex> disk_lock(lock); + /// Check that the reader is not already on the queue + DCHECK(find(request_contexts.begin(), request_contexts.end(), worker) == + request_contexts.end()); + request_contexts.push_back(worker); + } + work_available.NotifyAll(); + } + + DiskQueue(int id) : disk_id(id) {} +}; +} +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr-stress-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc b/be/src/runtime/io/disk-io-mgr-stress-test.cc new file mode 100644 index 0000000..45b36ed --- /dev/null +++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/io/disk-io-mgr-stress.h" +#include "util/cpu-info.h" +#include "util/string-parser.h" + +#include "common/names.h" + +using namespace impala; +using namespace impala::io; + +// Simple utility to run the disk io stress test. A optional second parameter +// can be passed to control how long to run this test (0 for forever). + +// TODO: make these configurable once we decide how to run BE tests with args +const int DEFAULT_DURATION_SEC = 1; +const int NUM_DISKS = 5; +const int NUM_THREADS_PER_DISK = 5; +const int NUM_CLIENTS = 10; +const bool TEST_CANCELLATION = true; + +int main(int argc, char** argv) { + google::InitGoogleLogging(argv[0]); + CpuInfo::Init(); + OsInfo::Init(); + impala::InitThreading(); + int duration_sec = DEFAULT_DURATION_SEC; + + if (argc == 2) { + StringParser::ParseResult status; + duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), &status); + if (status != StringParser::PARSE_SUCCESS) { + printf("Invalid arg: %s\n", argv[1]); + return 1; + } + } + if (duration_sec != 0) { + printf("Running stress test for %d seconds.\n", duration_sec); + } else { + printf("Running stress test indefinitely.\n"); + } + DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION); + test.Run(duration_sec); + + return 0; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr-stress.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc new file mode 100644 index 0000000..8815357 --- /dev/null +++ b/be/src/runtime/io/disk-io-mgr-stress.cc @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/thread/mutex.hpp> + +#include "runtime/io/disk-io-mgr-stress.h" + +#include "runtime/io/request-context.h" +#include "util/time.h" + +#include "common/names.h" + +using namespace impala; +using namespace impala::io; + +static const float ABORT_CHANCE = .10f; +static const int MIN_READ_LEN = 1; +static const int MAX_READ_LEN = 20; + +static const int MIN_FILE_LEN = 10; +static const int MAX_FILE_LEN = 1024; + +// Make sure this is between MIN/MAX FILE_LEN to test more cases +static const int MIN_READ_BUFFER_SIZE = 64; +static const int MAX_READ_BUFFER_SIZE = 128; + +static const int CANCEL_READER_PERIOD_MS = 20; // in ms + +static void CreateTempFile(const char* filename, const char* data) { + FILE* file = fopen(filename, "w"); + CHECK(file != NULL); + fwrite(data, 1, strlen(data), file); + fclose(file); +} + +string GenerateRandomData() { + int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN; + stringstream ss; + for (int i = 0; i < rand_len; ++i) { + char c = rand() % 26 + 'a'; + ss << c; + } + return ss.str(); +} + +struct DiskIoMgrStress::Client { + boost::mutex lock; + unique_ptr<RequestContext> reader; + int file_idx; + vector<ScanRange*> scan_ranges; + int abort_at_byte; + int files_processed; +}; + +DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk, + int num_clients, bool includes_cancellation) : + num_clients_(num_clients), + includes_cancellation_(includes_cancellation) { + + time_t rand_seed = time(NULL); + LOG(INFO) << "Running with rand seed: " << rand_seed; + srand(rand_seed); + + io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk, + MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE)); + Status status = io_mgr_->Init(&mem_tracker_); + CHECK(status.ok()); + + // Initialize some data files. It doesn't really matter how many there are. + files_.resize(num_clients * 2); + for (int i = 0; i < files_.size(); ++i) { + stringstream ss; + ss << "/tmp/disk_io_mgr_stress_file" << i; + files_[i].filename = ss.str(); + files_[i].data = GenerateRandomData(); + CreateTempFile(files_[i].filename.c_str(), files_[i].data.c_str()); + } + + clients_ = new Client[num_clients_]; + client_mem_trackers_.resize(num_clients_); + for (int i = 0; i < num_clients_; ++i) { + NewClient(i); + } +} + +void DiskIoMgrStress::ClientThread(int client_id) { + Client* client = &clients_[client_id]; + Status status; + char read_buffer[MAX_FILE_LEN]; + + while (!shutdown_) { + bool eos = false; + int bytes_read = 0; + + const string& expected = files_[client->file_idx].data; + + while (!eos) { + ScanRange* range; + Status status = io_mgr_->GetNextRange(client->reader.get(), &range); + CHECK(status.ok() || status.IsCancelled()); + if (range == NULL) break; + + while (true) { + unique_ptr<BufferDescriptor> buffer; + status = range->GetNext(&buffer); + CHECK(status.ok() || status.IsCancelled()); + if (buffer == NULL) break; + + int64_t scan_range_offset = buffer->scan_range_offset(); + int len = buffer->len(); + CHECK_GE(scan_range_offset, 0); + CHECK_LT(scan_range_offset, expected.size()); + CHECK_GT(len, 0); + + // We get scan ranges back in arbitrary order so the scan range to the file + // offset. + int64_t file_offset = scan_range_offset + range->offset(); + + // Validate the bytes read + CHECK_LE(file_offset + len, expected.size()); + CHECK_EQ(strncmp(reinterpret_cast<char*>(buffer->buffer()), + &expected.c_str()[file_offset], len), 0); + + // Copy the bytes from this read into the result buffer. + memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len()); + io_mgr_->ReturnBuffer(move(buffer)); + bytes_read += len; + + CHECK_GE(bytes_read, 0); + CHECK_LE(bytes_read, expected.size()); + + if (bytes_read > client->abort_at_byte) { + eos = true; + break; + } + } // End of buffer + } // End of scan range + + if (bytes_read == expected.size()) { + // This entire file was read without being cancelled, validate the entire result + CHECK(status.ok()); + CHECK_EQ(strncmp(read_buffer, expected.c_str(), bytes_read), 0); + } + + // Unregister the old client and get a new one + unique_lock<mutex> lock(client->lock); + io_mgr_->UnregisterContext(client->reader.get()); + NewClient(client_id); + } + + unique_lock<mutex> lock(client->lock); + io_mgr_->UnregisterContext(client->reader.get()); + client->reader = NULL; +} + +// Cancel a random reader +void DiskIoMgrStress::CancelRandomReader() { + if (!includes_cancellation_) return; + + int rand_client = rand() % num_clients_; + + unique_lock<mutex> lock(clients_[rand_client].lock); + io_mgr_->CancelContext(clients_[rand_client].reader.get()); +} + +void DiskIoMgrStress::Run(int sec) { + shutdown_ = false; + for (int i = 0; i < num_clients_; ++i) { + readers_.add_thread( + new thread(&DiskIoMgrStress::ClientThread, this, i)); + } + + // Sleep and let the clients do their thing for 'sec' + for (int loop_count = 1; sec == 0 || loop_count <= sec; ++loop_count) { + int iter = (1000) / CANCEL_READER_PERIOD_MS; + for (int i = 0; i < iter; ++i) { + SleepForMs(CANCEL_READER_PERIOD_MS); + CancelRandomReader(); + } + LOG(ERROR) << "Finished iteration: " << loop_count; + } + + // Signal shutdown for the client threads + shutdown_ = true; + + for (int i = 0; i < num_clients_; ++i) { + unique_lock<mutex> lock(clients_[i].lock); + if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader.get()); + } + + readers_.join_all(); +} + +// Initialize a client to read one of the files at random. The scan ranges are +// assigned randomly. +void DiskIoMgrStress::NewClient(int i) { + Client& client = clients_[i]; + ++client.files_processed; + client.file_idx = rand() % files_.size(); + int file_len = files_[client.file_idx].data.size(); + + client.abort_at_byte = file_len; + + if (includes_cancellation_) { + float rand_value = rand() / (float)RAND_MAX; + if (rand_value < ABORT_CHANCE) { + // Abort at a random byte inside the file + client.abort_at_byte = rand() % file_len; + } + } + + for (int i = 0; i < client.scan_ranges.size(); ++i) { + delete client.scan_ranges[i]; + } + client.scan_ranges.clear(); + + int assigned_len = 0; + while (assigned_len < file_len) { + int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN; + range_len = min(range_len, file_len - assigned_len); + + ScanRange* range = new ScanRange(); + range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len, + 0, false, BufferOpts::Uncached()); + client.scan_ranges.push_back(range); + assigned_len += range_len; + } + + client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_)); + client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get()); + Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges); + CHECK(status.ok()); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr-stress.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h new file mode 100644 index 0000000..b872694 --- /dev/null +++ b/be/src/runtime/io/disk-io-mgr-stress.h @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H +#define IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H + +#include <memory> +#include <vector> +#include <boost/scoped_ptr.hpp> +#include <boost/thread/thread.hpp> + +#include "runtime/io/disk-io-mgr.h" +#include "runtime/mem-tracker.h" +#include "runtime/thread-resource-mgr.h" + +namespace impala { +namespace io { + +/// Test utility to stress the disk io mgr. It allows for a configurable +/// number of clients. The clients continuously issue work to the io mgr and +/// asynchronously get cancelled. The stress test can be run forever or for +/// a fixed duration. The unit test runs this for a fixed duration. +class DiskIoMgrStress { + public: + DiskIoMgrStress(int num_disks, int num_threads_per_disk, int num_clients, + bool includes_cancellation); + + /// Run the test for 'sec'. If 0, run forever + void Run(int sec); + + private: + struct Client; + + struct File { + std::string filename; + std::string data; // the data in the file, used to validate + }; + + + /// Files used for testing. These are created at startup and recycled + /// during the test + std::vector<File> files_; + + /// Root mem tracker. + MemTracker mem_tracker_; + + /// io manager + boost::scoped_ptr<DiskIoMgr> io_mgr_; + + /// Thread group for reader threads + boost::thread_group readers_; + + /// Array of clients + int num_clients_; + Client* clients_; + + /// Client MemTrackers, one per client. + std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_; + + /// If true, tests cancelling readers + bool includes_cancellation_; + + /// Flag to signal that client reader threads should exit + volatile bool shutdown_; + + /// Helper to initialize a new reader client, registering a new reader with the + /// io mgr and initializing the scan ranges + void NewClient(int i); + + /// Thread running the reader. When the current reader is done (either normally + /// or cancelled), it picks up a new reader + void ClientThread(int client_id); + + /// Possibly cancels a random reader. + void CancelRandomReader(); +}; +} +} + +#endif