http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/io/request-context.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h index 9807805..737a16c 100644 --- a/be/src/runtime/io/request-context.h +++ b/be/src/runtime/io/request-context.h @@ -24,9 +24,7 @@ namespace impala { namespace io { /// A request context is used to group together I/O requests belonging to a client of the -/// I/O manager for management and scheduling. For most I/O manager clients it is an -/// opaque pointer, but some clients may need to include this header, e.g. to make the -/// unique_ptr<DiskIoRequestContext> destructor work correctly. +/// I/O manager for management and scheduling. /// /// Implementation Details /// ====================== @@ -83,7 +81,51 @@ namespace io { /// behind at most one write range. class RequestContext { public: - ~RequestContext() { DCHECK_EQ(state_, Inactive) << "Must be unregistered."; } + ~RequestContext() { + DCHECK_EQ(state_, Inactive) << "Must be unregistered. " << DebugString(); + } + + /// Cancel the context asynchronously. All outstanding requests are cancelled + /// asynchronously. This does not need to be called if the context finishes normally. + /// Calling GetNext() on any scan ranges belonging to this RequestContext will return + /// CANCELLED (or another error, if an error was encountered for that scan range before + /// it is cancelled). + void Cancel(); + + bool IsCancelled() { + boost::unique_lock<boost::mutex> lock(lock_); + return state_ == Cancelled; + } + + int64_t queue_size() const { return num_ready_buffers_.Load(); } + int64_t bytes_read_local() const { return bytes_read_local_.Load(); } + int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); } + int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); } + int num_remote_ranges() const { return num_remote_ranges_.Load(); } + int64_t unexpected_remote_bytes() const { return unexpected_remote_bytes_.Load(); } + + int cached_file_handles_hit_count() const { + return cached_file_handles_hit_count_.Load(); + } + + int cached_file_handles_miss_count() const { + return cached_file_handles_miss_count_.Load(); + } + + void set_bytes_read_counter(RuntimeProfile::Counter* bytes_read_counter) { + bytes_read_counter_ = bytes_read_counter; + } + + void set_read_timer(RuntimeProfile::Counter* read_timer) { read_timer_ = read_timer; } + + void set_active_read_thread_counter( + RuntimeProfile::Counter* active_read_thread_counter) { + active_read_thread_counter_ = active_read_thread_counter; + } + + void set_disks_accessed_bitmap(RuntimeProfile::Counter* disks_accessed_bitmap) { + disks_accessed_bitmap_ = disks_accessed_bitmap; + } private: DISALLOW_COPY_AND_ASSIGN(RequestContext); @@ -108,11 +150,24 @@ class RequestContext { RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker); + /// Allocates a buffer to read into with size between + /// max('buffer_size', 'min_buffer_size_') and 'max_buffer_size_'. + /// Does not acquire 'lock_'. + /// TODO: allocate using the buffer pool client associated with this reader. + Status AllocBuffer(ScanRange* range, int64_t buffer_size, + std::unique_ptr<BufferDescriptor>* buffer); + + /// Cleans up a buffer. If the buffer was allocated with AllocBuffer(), frees the buffer + /// memory and release the consumption to the client MemTracker. Otherwise (e.g. a + /// client or HDFS cache buffer), just prepares the descriptor to be destroyed. + /// After this is called, buffer->buffer() is NULL. Does not acquire 'lock_'. + void FreeBuffer(BufferDescriptor* buffer); + /// Decrements the number of active disks for this reader. If the disk count /// goes to 0, the disk complete condition variable is signaled. - /// Reader lock must be taken before this call. - void DecrementDiskRefCount() { - // boost doesn't let us dcheck that the reader lock is taken + /// 'lock_' must be held via 'lock'. + void DecrementDiskRefCount(const boost::unique_lock<boost::mutex>& lock) { + DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); DCHECK_GT(num_disks_with_ranges_, 0); if (--num_disks_with_ranges_ == 0) { disks_complete_cond_var_.NotifyAll(); @@ -129,25 +184,25 @@ class RequestContext { /// Adds range to in_flight_ranges, scheduling this reader on the disk threads /// if necessary. - /// Reader lock must be taken before this. - void ScheduleScanRange(ScanRange* range) { + /// 'lock_' must be held via 'lock' + void ScheduleScanRange(const boost::unique_lock<boost::mutex>& lock, ScanRange* range) { + DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); DCHECK_EQ(state_, Active); DCHECK(range != NULL); RequestContext::PerDiskState& state = disk_states_[range->disk_id()]; state.in_flight_ranges()->Enqueue(range); - state.ScheduleContext(this, range->disk_id()); + state.ScheduleContext(lock, this, range->disk_id()); } - /// Cancels the context with status code 'status' - void Cancel(const Status& status); - /// Cancel the context if not already cancelled, wait for all scan ranges to finish /// and mark the context as inactive, after which it cannot be used. void CancelAndMarkInactive(); /// Adds request range to disk queue for this request context. Currently, /// schedule_immediately must be false is RequestRange is a write range. - void AddRequestRange(RequestRange* range, bool schedule_immediately); + /// Caller must hold 'lock_' via 'lock'. + void AddRequestRange(const boost::unique_lock<boost::mutex>& lock, + RequestRange* range, bool schedule_immediately); /// Validates invariants of reader. Reader lock must be taken beforehand. bool Validate() const; @@ -159,6 +214,7 @@ class RequestContext { DiskIoMgr* const parent_; /// Memory used for this reader. This is unowned by this object. + /// TODO: replace with bp client MemTracker* const mem_tracker_; /// Total bytes read for this reader @@ -187,7 +243,7 @@ class RequestContext { /// Total number of bytes from remote reads that were expected to be local. AtomicInt64 unexpected_remote_bytes_{0}; - /// The number of buffers that have been returned to the reader (via GetNext) that the + /// The number of buffers that have been returned to the reader (via GetNext()) that the /// reader has not returned. Only included for debugging and diagnostics. AtomicInt32 num_buffers_in_reader_{0}; @@ -227,9 +283,6 @@ class RequestContext { /// Current state of the reader State state_ = Active; - /// Status of this reader. Set to non-ok if cancelled. - Status status_; - /// The number of disks with scan ranges remaining (always equal to the sum of /// disks with ranges). int num_disks_with_ranges_ = 0; @@ -237,7 +290,7 @@ class RequestContext { /// This is the list of ranges that are expected to be cached on the DN. /// When the reader asks for a new range (GetNextScanRange()), we first /// return ranges from this list. - InternalQueue<ScanRange> cached_ranges_; + InternalList<ScanRange> cached_ranges_; /// A list of ranges that should be returned in subsequent calls to /// GetNextRange. @@ -246,11 +299,11 @@ class RequestContext { /// Populating it preemptively means we make worse scheduling decisions. /// We currently populate one range per disk. /// TODO: think about this some more. - InternalQueue<ScanRange> ready_to_start_ranges_; + InternalList<ScanRange> ready_to_start_ranges_; ConditionVariable ready_to_start_ranges_cv_; // used with lock_ /// Ranges that are blocked due to back pressure on outgoing buffers. - InternalQueue<ScanRange> blocked_ranges_; + InternalList<ScanRange> blocked_ranges_; /// Condition variable for UnregisterContext() to wait for all disks to complete ConditionVariable disks_complete_cond_var_; @@ -270,21 +323,9 @@ class RequestContext { next_scan_range_to_start_ = range; } - /// We need to have a memory barrier to prevent this load from being reordered - /// with num_threads_in_op(), since these variables are set without the reader - /// lock taken - bool is_on_queue() const { - bool b = is_on_queue_; - __sync_synchronize(); - return b; - } + bool is_on_queue() const { return is_on_queue_.Load() != 0; } - int num_threads_in_op() const { - int v = num_threads_in_op_.Load(); - // TODO: determine whether this barrier is necessary for any callsites. - AtomicUtil::MemoryBarrier(); - return v; - } + int num_threads_in_op() const { return num_threads_in_op_.Load(); } const InternalQueue<ScanRange>* unstarted_scan_ranges() const { return &unstarted_scan_ranges_; @@ -303,26 +344,41 @@ class RequestContext { InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; } /// Schedules the request context on this disk if it's not already on the queue. - /// Context lock must be taken before this. - void ScheduleContext(RequestContext* context, int disk_id); - - /// Increment the ref count on reader. We need to track the number of threads per - /// reader per disk that are in the unlocked hdfs read code section. This is updated - /// by multiple threads without a lock so we need to use an atomic int. - void IncrementRequestThreadAndDequeue() { + /// context->lock_ must be held by the caller via 'context_lock'. + void ScheduleContext(const boost::unique_lock<boost::mutex>& context_lock, + RequestContext* context, int disk_id); + + /// Increment the count of disk threads that have a reference to this context. These + /// threads do not hold any locks while reading from HDFS, so we need to prevent the + /// RequestContext from being destroyed underneath them. + /// + /// The caller does not need to hold 'lock_', so this can execute concurrently with + /// itself and DecrementDiskThread(). + void IncrementDiskThreadAndDequeue() { + /// Incrementing 'num_threads_in_op_' first so that there is no window when other + /// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no + /// references left to this context. num_threads_in_op_.Add(1); - is_on_queue_ = false; + is_on_queue_.Store(0); } - void DecrementRequestThread() { num_threads_in_op_.Add(-1); } - - /// Decrement request thread count and do final cleanup if this is the last - /// thread. RequestContext lock must be taken before this. - void DecrementRequestThreadAndCheckDone(RequestContext* context) { - num_threads_in_op_.Add(-1); // Also acts as a barrier. - if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) { - // This thread is the last one for this reader on this disk, do final cleanup - context->DecrementDiskRefCount(); + /// Decrement the count of disks threads with a reference to this context. Does final + /// cleanup if the context is cancelled and this is the last thread for the disk. + /// context->lock_ must be held by the caller via 'context_lock'. + void DecrementDiskThread(const boost::unique_lock<boost::mutex>& context_lock, + RequestContext* context) { + DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock()); + num_threads_in_op_.Add(-1); + + if (context->state_ != Cancelled) { + DCHECK_EQ(context->state_, Active); + return; + } + // The state is cancelled, check to see if we're the last thread to touch the + // context on this disk. We need to load 'is_on_queue_' and 'num_threads_in_op_' + // in this order to avoid a race with IncrementDiskThreadAndDequeue(). + if (is_on_queue_.Load() == 0 && num_threads_in_op_.Load() == 0 && !done_) { + context->DecrementDiskRefCount(context_lock); done_ = true; } } @@ -335,7 +391,12 @@ class RequestContext { bool done_ = true; /// For each disk, keeps track if the context is on this disk's queue, indicating - /// the disk must do some work for this context. The disk needs to do work in 4 cases: + /// the disk must do some work for this context. 1 means that the context is on the + /// disk queue, 0 means that it's not on the queue (either because it has on ranges + /// active for the disk or because a disk thread dequeued the context and is + /// currently processing a request). + /// + /// The disk needs to do work in 4 cases: /// 1) in_flight_ranges is not empty, the disk needs to read for this reader. /// 2) next_range_to_start is NULL, the disk needs to prepare a scan range to be /// read next. @@ -346,7 +407,15 @@ class RequestContext { /// useful that can be done. If there's nothing useful, the disk queue will wake up /// and then remove the reader from the queue. Doing this causes thrashing of the /// threads. - bool is_on_queue_ = false; + /// + /// This variable is important during context cancellation because it indicates + /// whether a queue has a reference to the context that must be released before + /// the context is considered unregistered. Atomically set to false after + /// incrementing 'num_threads_in_op_' when dequeueing so that there is no window + /// when other threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there + /// are no references left to this context. + /// TODO: this could be combined with 'num_threads_in_op_' to be a single refcount. + AtomicInt32 is_on_queue_{0}; /// For each disks, the number of request ranges that have not been fully read. /// In the non-cancellation path, this will hit 0, and done will be set to true @@ -376,11 +445,11 @@ class RequestContext { /// range to ready_to_start_ranges_. ScanRange* next_scan_range_to_start_ = nullptr; - /// For each disk, the number of threads issuing the underlying read/write on behalf - /// of this context. There are a few places where we release the context lock, do some - /// work, and then grab the lock again. Because we don't hold the lock for the - /// entire operation, we need this ref count to keep track of which thread should do - /// final resource cleanup during cancellation. + /// For each disk, the number of disk threads issuing the underlying read/write on + /// behalf of this context. There are a few places where we release the context lock, + /// do some work, and then grab the lock again. Because we don't hold the lock for + /// the entire operation, we need this ref count to keep track of which thread should + /// do final resource cleanup during cancellation. /// Only the thread that sees the count at 0 should do the final cleanup. AtomicInt32 num_threads_in_op_{0};
http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/io/request-ranges.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h index 222f847..ab0810a 100644 --- a/be/src/runtime/io/request-ranges.h +++ b/be/src/runtime/io/request-ranges.h @@ -23,6 +23,7 @@ #include <boost/thread/mutex.hpp> +#include "common/atomic.h" #include "common/hdfs.h" #include "common/status.h" #include "util/condition-variable.h" @@ -55,14 +56,6 @@ class BufferDescriptor { /// Returns the offset within the scan range that this buffer starts at int64_t scan_range_offset() const { return scan_range_offset_; } - /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and set - /// 'mem_tracker_' to 'dst'. 'mem_tracker_' and 'dst' must be non-NULL. Does not - /// check memory limits on 'dst': the caller should check the memory limit if a - /// different memory limit may apply to 'dst'. If the buffer was a client-provided - /// buffer, transferring is not allowed. - /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp. - void TransferOwnership(MemTracker* dst); - private: friend class DiskIoMgr; friend class ScanRange; @@ -71,8 +64,7 @@ class BufferDescriptor { /// Create a buffer descriptor for a new reader, range and data buffer. The buffer /// memory should already be accounted against 'mem_tracker'. BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader, - ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len, - MemTracker* mem_tracker); + ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len); /// Return true if this is a cached buffer owned by HDFS. bool is_cached() const; @@ -86,14 +78,11 @@ class BufferDescriptor { /// Reader that this buffer is for. RequestContext* const reader_; - /// The current tracker this buffer is associated with. After initialisation, - /// NULL for cached buffers and non-NULL for all other buffers. - MemTracker* mem_tracker_; - /// Scan range that this buffer is for. Non-NULL when initialised. ScanRange* const scan_range_; - /// buffer with the read contents + /// Buffer for the read contents. Must be set to NULL in RequestContext::FreeBuffer() + /// before destruction of the descriptor. uint8_t* buffer_; /// length of buffer_. For buffers from cached reads, the length is 0. @@ -105,9 +94,6 @@ class BufferDescriptor { /// true if the current scan range is complete bool eosr_ = false; - /// Status of the read to this buffer. if status is not ok, 'buffer' is nullptr - Status status_; - int64_t scan_range_offset_ = 0; }; @@ -236,10 +222,17 @@ class ScanRange : public RequestRange { /// Only one thread can be in GetNext() at any time. Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; - /// Cancel this scan range. This cleans up all queued buffers and - /// wakes up any threads blocked on GetNext(). - /// Status is the reason the range was cancelled. Must not be ok(). - /// Status is returned to the user in GetNext(). + /// Returns the buffer to the scan range. This must be called for every buffer + /// returned by GetNext(). After calling this, the buffer descriptor is invalid + /// and cannot be accessed. + void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer); + + /// Cancel this scan range. This cleans up all queued buffers and wakes up any threads + /// blocked on GetNext(). Status is a non-ok status with the reason the range was + /// cancelled, e.g. CANCELLED if the range was cancelled because it was not needed, or + /// another error if an error was encountered while scanning the range. Status is + /// returned to the any callers of GetNext(). If a thread is currently blocked in + /// GetNext(), it is woken up. void Cancel(const Status& status); /// return a descriptive string for debug. @@ -263,10 +256,6 @@ class ScanRange : public RequestRange { bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock, std::unique_ptr<BufferDescriptor> buffer); - /// Cleanup any queued buffers (i.e. due to cancellation). This cannot - /// be called with any locks taken. - void CleanupQueuedBuffers(); - /// Validates the internal state of this range. lock_ must be taken /// before calling this. bool Validate(); @@ -283,6 +272,10 @@ class ScanRange : public RequestRange { /// exclusive use by this scan range. The scan range is the exclusive owner of the /// file handle, and the file handle is destroyed in Close(). /// All local OS files are opened using normal OS file APIs. + /// + /// If an error is encountered during opening, returns a status describing the error. + /// If the scan range was cancelled, returns the reason for cancellation. Otherwise, on + /// success, returns OK. Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT; /// Closes the file for this range. This function only modifies state in this range. @@ -290,6 +283,10 @@ class ScanRange : public RequestRange { /// Reads from this range into 'buffer', which has length 'buffer_len' bytes. Returns /// the number of bytes read. The read position in this scan range is updated. + /// + /// If an error is encountered during reading, returns a status describing the error. + /// If the scan range was cancelled, returns the reason for cancellation. Otherwise, on + /// success, returns OK. Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) WARN_UNUSED_RESULT; @@ -307,6 +304,23 @@ class ScanRange : public RequestRange { Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock, bool* read_succeeded) WARN_UNUSED_RESULT; + /// Cleans up a buffer that was not returned to the client. + /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor. + /// This function will acquire 'lock_' and may acquire 'hdfs_lock_'. + void CleanUpBuffer(std::unique_ptr<BufferDescriptor> buffer); + + /// Same as CleanUpBuffer() except the caller must already hold 'lock_' via + /// 'scan_range_lock'. + void CleanUpBufferLocked(const boost::unique_lock<boost::mutex>& scan_range_lock, + std::unique_ptr<BufferDescriptor> buffer); + + /// Returns true if no more buffers will be returned to clients in the future, + /// either because of hitting eosr or cancellation. + bool all_buffers_returned(const boost::unique_lock<boost::mutex>& lock) const { + DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); + return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty()); + } + /// Pointer to caller specified metadata. This is untouched by the io manager /// and the caller can put whatever auxiliary data in here. void* meta_data_ = nullptr; @@ -323,6 +337,9 @@ class ScanRange : public RequestRange { /// TODO: we can do more with this bool expected_local_ = false; + /// Last modified time of the file associated with the scan range. Set in Reset(). + int64_t mtime_; + /// Total number of bytes read remotely. This is necessary to maintain a count of /// the number of remote scan ranges. Since IO statistics can be collected multiple /// times for a scan range, it is necessary to keep some state about whether this @@ -378,26 +395,29 @@ class ScanRange : public RequestRange { /// Number of bytes read so far for this scan range int bytes_read_; - /// Status for this range. This is non-ok if is_cancelled_ is true. - /// Note: an individual range can fail without the RequestContext being - /// cancelled. This allows us to skip individual ranges. - Status status_; + /// The number of buffers that have been returned to a client via GetNext() that have + /// not yet been returned with ReturnBuffer(). + int num_buffers_in_reader_ = 0; /// If true, the last buffer for this scan range has been queued. + /// If this is true and 'ready_buffers_' is empty, then no more buffers will be + /// returned to the caller by this scan range. bool eosr_queued_ = false; - /// If true, the last buffer for this scan range has been returned. - bool eosr_returned_ = false; - /// If true, this scan range has been removed from the reader's in_flight_ranges /// queue because the ready_buffers_ queue is full. bool blocked_on_queue_ = false; - /// IO buffers that are queued for this scan range. - /// Condition variable for GetNext - ConditionVariable buffer_ready_cv_; + /// IO buffers that are queued for this scan range. When Cancel() is called + /// this is drained by the cancelling thread. I.e. this is always empty if + /// 'cancel_status_' is not OK. std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_; + /// Condition variable for threads in GetNext() that are waiting for the next buffer. + /// Signalled when a buffer is enqueued in 'ready_buffers_' or the scan range is + /// cancelled. + ConditionVariable buffer_ready_cv_; + /// Lock that should be taken during hdfs calls. Only one thread (the disk reading /// thread) calls into hdfs at a time so this lock does not have performance impact. /// This lock only serves to coordinate cleanup. Specifically it serves to ensure @@ -406,11 +426,16 @@ class ScanRange : public RequestRange { /// If this lock and lock_ need to be taken, lock_ must be taken first. boost::mutex hdfs_lock_; - /// If true, this scan range has been cancelled. - bool is_cancelled_ = false; - - /// Last modified time of the file associated with the scan range - int64_t mtime_; + /// If non-OK, this scan range has been cancelled. This status is the reason for + /// cancellation - CANCELLED if cancelled without error, or another status if an + /// error caused cancellation. Note that a range can be cancelled without cancelling + /// the owning context. This means that ranges can be cancelled or hit errors without + /// aborting all scan ranges. + // + /// Writers must hold both 'lock_' and 'hdfs_lock_'. Readers must hold either 'lock_' + /// or 'hdfs_lock_'. This prevents the range from being cancelled while any thread + /// is inside a critical section. + Status cancel_status_; }; /// Used to specify data to be written to a file and offset. http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/io/scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc index 21daa96..1ffba00 100644 --- a/be/src/runtime/io/scan-range.cc +++ b/be/src/runtime/io/scan-range.cc @@ -45,19 +45,15 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u bool ScanRange::EnqueueBuffer( const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) { DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); + DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer"; { unique_lock<mutex> scan_range_lock(lock_); DCHECK(Validate()) << DebugString(); - DCHECK(!eosr_returned_); DCHECK(!eosr_queued_); - if (is_cancelled_) { - // Return the buffer, this range has been cancelled - if (buffer->buffer_ != nullptr) { - io_mgr_->num_buffers_in_readers_.Add(1); - reader_->num_buffers_in_reader_.Add(1); - } + if (!cancel_status_.ok()) { + // This range has been cancelled, no need to enqueue the buffer. reader_->num_used_buffers_.Add(-1); - io_mgr_->ReturnBuffer(move(buffer)); + CleanUpBufferLocked(scan_range_lock, move(buffer)); return false; } reader_->num_ready_buffers_.Add(1); @@ -67,9 +63,7 @@ bool ScanRange::EnqueueBuffer( DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT); blocked_on_queue_ = ready_buffers_.size() == DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT; } - buffer_ready_cv_.NotifyOne(); - return blocked_on_queue_; } @@ -78,56 +72,48 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { bool eosr; { unique_lock<mutex> scan_range_lock(lock_); - if (eosr_returned_) return Status::OK(); DCHECK(Validate()) << DebugString(); + // No more buffers to return - return the cancel status or OK if not cancelled. + if (all_buffers_returned(scan_range_lock)) return cancel_status_; - while (ready_buffers_.empty() && !is_cancelled_) { + while (ready_buffers_.empty() && cancel_status_.ok()) { buffer_ready_cv_.Wait(scan_range_lock); } - - if (is_cancelled_) { - DCHECK(!status_.ok()); - return status_; - } + /// Propagate cancellation to the client if it happened while we were waiting. + RETURN_IF_ERROR(cancel_status_); // Remove the first ready buffer from the queue and return it DCHECK(!ready_buffers_.empty()); DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT); *buffer = move(ready_buffers_.front()); ready_buffers_.pop_front(); - eosr_returned_ = (*buffer)->eosr(); eosr = (*buffer)->eosr(); } // Update tracking counters. The buffer has now moved from the IoMgr to the // caller. - io_mgr_->num_buffers_in_readers_.Add(1); - reader_->num_buffers_in_reader_.Add(1); reader_->num_ready_buffers_.Add(-1); reader_->num_used_buffers_.Add(-1); if (eosr) reader_->num_finished_ranges_.Add(1); - Status status = (*buffer)->status_; - if (!status.ok()) { - io_mgr_->ReturnBuffer(move(*buffer)); - return status; - } - unique_lock<mutex> reader_lock(reader_->lock_); DCHECK(reader_->Validate()) << endl << reader_->DebugString(); if (reader_->state_ == RequestContext::Cancelled) { reader_->blocked_ranges_.Remove(this); - Cancel(reader_->status_); - io_mgr_->ReturnBuffer(move(*buffer)); - return status_; + Cancel(Status::CANCELLED); + CleanUpBuffer(move(*buffer)); + return Status::CANCELLED; } + // At this point success is guaranteed so increment counters for returned buffers. + reader_->num_buffers_in_reader_.Add(1); { // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer() // may have been called after we released 'lock_' above so we need to re-check // whether the queue is full. unique_lock<mutex> scan_range_lock(lock_); + ++num_buffers_in_reader_; if (blocked_on_queue_ && ready_buffers_.size() < DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT && !eosr_queued_) { @@ -135,51 +121,81 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { // This scan range was blocked and is no longer, add it to the reader // queue again. reader_->blocked_ranges_.Remove(this); - reader_->ScheduleScanRange(this); + reader_->ScheduleScanRange(reader_lock, this); } } return Status::OK(); } +void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) { + reader_->num_buffers_in_reader_.Add(-1); + { + unique_lock<mutex> scan_range_lock(lock_); + --num_buffers_in_reader_; + CleanUpBufferLocked(scan_range_lock, move(buffer_desc)); + } +} + +void ScanRange::CleanUpBuffer(unique_ptr<BufferDescriptor> buffer_desc) { + unique_lock<mutex> scan_range_lock(lock_); + CleanUpBufferLocked(scan_range_lock, move(buffer_desc)); +} + +void ScanRange::CleanUpBufferLocked( + const boost::unique_lock<boost::mutex>& scan_range_lock, + unique_ptr<BufferDescriptor> buffer_desc) { + DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock()); + DCHECK(buffer_desc != nullptr); + DCHECK_EQ(this, buffer_desc->scan_range_); + buffer_desc->reader_->FreeBuffer(buffer_desc.get()); + + if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_ == 0) { + // Close the scan range if there are no more buffers in the reader and no more buffers + // will be returned to readers in future. Close() is idempotent so it is ok to call + // multiple times during cleanup so long as the range is actually finished. + Close(); + } +} + void ScanRange::Cancel(const Status& status) { // Cancelling a range that was never started, ignore. if (io_mgr_ == nullptr) return; DCHECK(!status.ok()); { - // Grab both locks to make sure that all working threads see is_cancelled_. + // Grab both locks to make sure that we don't change 'cancel_status_' while other + // threads are in critical sections. unique_lock<mutex> scan_range_lock(lock_); - unique_lock<mutex> hdfs_lock(hdfs_lock_); - DCHECK(Validate()) << DebugString(); - if (is_cancelled_) return; - is_cancelled_ = true; - status_ = status; + { + unique_lock<mutex> hdfs_lock(hdfs_lock_); + DCHECK(Validate()) << DebugString(); + // If already cancelled, preserve the original reason for cancellation. The first + // thread to set 'cancel_status_' does the cleanup below. + RETURN_VOID_IF_ERROR(cancel_status_); + cancel_status_ = status; + } + + /// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads + /// from seeing inconsistent state. + reader_->num_used_buffers_.Add(-ready_buffers_.size()); + reader_->num_ready_buffers_.Add(-ready_buffers_.size()); + while (!ready_buffers_.empty()) { + CleanUpBufferLocked(scan_range_lock, move(ready_buffers_.front())); + ready_buffers_.pop_front(); + } } buffer_ready_cv_.NotifyAll(); - CleanupQueuedBuffers(); // For cached buffers, we can't close the range until the cached buffer is returned. - // Close() is called from DiskIoMgr::ReturnBuffer(). + // Close() is called from ScanRange::CleanUpBufferLocked(). if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close(); } -void ScanRange::CleanupQueuedBuffers() { - DCHECK(is_cancelled_); - io_mgr_->num_buffers_in_readers_.Add(ready_buffers_.size()); - reader_->num_buffers_in_reader_.Add(ready_buffers_.size()); - reader_->num_used_buffers_.Add(-ready_buffers_.size()); - reader_->num_ready_buffers_.Add(-ready_buffers_.size()); - - while (!ready_buffers_.empty()) { - io_mgr_->ReturnBuffer(move(ready_buffers_.front())); - ready_buffers_.pop_front(); - } -} - string ScanRange::DebugString() const { stringstream ss; ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_ << " len=" << len_ << " bytes_read=" << bytes_read_ + << " cancel_status=" << cancel_status_.GetDetail() << " buffer_queue=" << ready_buffers_.size() << " hdfs_file=" << exclusive_hdfs_fh_; return ss.str(); @@ -187,14 +203,12 @@ string ScanRange::DebugString() const { bool ScanRange::Validate() { if (bytes_read_ > len_) { - LOG(WARNING) << "Bytes read tracking is wrong. Shouldn't read past the scan range." + LOG(ERROR) << "Bytes read tracking is wrong. Shouldn't read past the scan range." << " bytes_read_=" << bytes_read_ << " len_=" << len_; return false; } - if (eosr_returned_ && !eosr_queued_) { - LOG(WARNING) << "Returned eosr to reader before finishing reading the scan range" - << " eosr_returned_=" << eosr_returned_ - << " eosr_queued_=" << eosr_queued_; + if (!cancel_status_.ok() && !ready_buffers_.empty()) { + LOG(ERROR) << "Cancelled range should not have queued buffers " << DebugString(); return false; } return true; @@ -203,13 +217,14 @@ bool ScanRange::Validate() { ScanRange::ScanRange() : RequestRange(RequestType::READ), num_remote_bytes_(0), - external_buffer_tag_(ExternalBufferTag::NO_BUFFER), - mtime_(-1) {} + external_buffer_tag_(ExternalBufferTag::NO_BUFFER) {} ScanRange::~ScanRange() { DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed."; DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) << "Cached buffer was not released."; + DCHECK_EQ(0, ready_buffers_.size()); + DCHECK_EQ(0, num_buffers_in_reader_); } void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, @@ -253,16 +268,15 @@ void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) { local_file_ = nullptr; exclusive_hdfs_fh_ = nullptr; bytes_read_ = 0; - is_cancelled_ = false; + cancel_status_ = Status::OK(); eosr_queued_= false; - eosr_returned_= false; blocked_on_queue_ = false; DCHECK(Validate()) << DebugString(); } Status ScanRange::Open(bool use_file_handle_cache) { unique_lock<mutex> hdfs_lock(hdfs_lock_); - if (is_cancelled_) return Status::CANCELLED; + RETURN_IF_ERROR(cancel_status_); if (fs_ != nullptr) { if (exclusive_hdfs_fh_ != nullptr) return Status::OK(); @@ -386,7 +400,7 @@ int64_t ScanRange::MaxReadChunkSize() const { Status ScanRange::Read( uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) { unique_lock<mutex> hdfs_lock(hdfs_lock_); - if (is_cancelled_) return Status::CANCELLED; + RETURN_IF_ERROR(cancel_status_); *eosr = false; *bytes_read = 0; @@ -523,7 +537,7 @@ Status ScanRange::ReadFromCache( { unique_lock<mutex> hdfs_lock(hdfs_lock_); - if (is_cancelled_) return Status::CANCELLED; + RETURN_IF_ERROR(cancel_status_); DCHECK(exclusive_hdfs_fh_ != nullptr); DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER); @@ -561,7 +575,7 @@ Status ScanRange::ReadFromCache( // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client, // not the Impala backend. unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor( - io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr)); + io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0)); desc->len_ = bytes_read; desc->scan_range_offset_ = 0; desc->eosr_ = true; http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index 10a3424..8d28f8f 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -107,7 +107,6 @@ class MemTracker { /// destruction to prevent other threads from getting a reference to the MemTracker /// via its parent. Only used to deregister the query-level MemTracker from the /// global hierarchy. - /// TODO: IMPALA-3200: this is also used by BufferedBlockMgr, which will be deleted. void CloseAndUnregisterFromParent(); /// Include counters from a ReservationTracker in logs and other diagnostics. http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index 770eaba..0d6c6f0 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -49,7 +49,7 @@ Status TestEnv::Init() { exec_env_.reset(new ExecEnv); // Populate the ExecEnv state that the backend tests need. exec_env_->mem_tracker_.reset(new MemTracker(-1, "Process")); - RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init(exec_env_->process_mem_tracker())); + RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init()); exec_env_->metrics_.reset(new MetricGroup("test-env-metrics")); exec_env_->tmp_file_mgr_.reset(new TmpFileMgr); if (have_tmp_file_mgr_args_) { http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 3091c58..9161b63 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -25,6 +25,7 @@ #include <gtest/gtest.h> #include "common/init.h" +#include "runtime/io/request-context.h" #include "runtime/test-env.h" #include "runtime/tmp-file-mgr-internal.h" #include "runtime/tmp-file-mgr.h" @@ -134,7 +135,7 @@ class TmpFileMgrTest : public ::testing::Test { /// Helper to cancel the FileGroup RequestContext. static void CancelIoContext(TmpFileMgr::FileGroup* group) { - group->io_mgr_->CancelContext(group->io_ctx_.get()); + group->io_ctx_->Cancel(); } /// Helper to get the # of bytes allocated by the group. Validates that the sum across http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 3807670..7b00179 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -426,7 +426,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buf } exit: // Always return the buffer before exiting to avoid leaking it. - if (io_mgr_buffer != nullptr) io_mgr_->ReturnBuffer(move(io_mgr_buffer)); + if (io_mgr_buffer != nullptr) handle->read_range_->ReturnBuffer(move(io_mgr_buffer)); handle->read_range_ = nullptr; return status; } http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/util/impalad-metrics.cc ---------------------------------------------------------------------- diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc index 8f5f1be..32320d8 100644 --- a/be/src/util/impalad-metrics.cc +++ b/be/src/util/impalad-metrics.cc @@ -46,12 +46,6 @@ const char* ImpaladMetricKeys::HASH_TABLE_TOTAL_BYTES = "impala-server.hash-table.total-bytes"; const char* ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES = "impala-server.io-mgr.num-open-files"; -const char* ImpaladMetricKeys::IO_MGR_NUM_BUFFERS = - "impala-server.io-mgr.num-buffers"; -const char* ImpaladMetricKeys::IO_MGR_TOTAL_BYTES = - "impala-server.io-mgr.total-bytes"; -const char* ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS = - "impala-server.io-mgr.num-unused-buffers"; const char* ImpaladMetricKeys::IO_MGR_BYTES_READ = "impala-server.io-mgr.bytes-read"; const char* ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ = @@ -208,11 +202,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) { ImpaladMetricKeys::NUM_FILES_OPEN_FOR_INSERT, 0); // Initialize IO mgr metrics - IO_MGR_NUM_OPEN_FILES = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0); - IO_MGR_NUM_BUFFERS = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0); - IO_MGR_TOTAL_BYTES = m->AddGauge(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0); - IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge( - ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS, 0); + IO_MGR_NUM_OPEN_FILES = m->AddGauge( + ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0); IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge( ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES, 0); IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge( http://git-wip-us.apache.org/repos/asf/impala/blob/65680dc4/be/src/util/impalad-metrics.h ---------------------------------------------------------------------- diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h index f32e3fa..a62c4c6 100644 --- a/be/src/util/impalad-metrics.h +++ b/be/src/util/impalad-metrics.h @@ -67,15 +67,6 @@ class ImpaladMetricKeys { /// Number of files currently opened by the io mgr static const char* IO_MGR_NUM_OPEN_FILES; - /// Number of IO buffers allocated by the io mgr - static const char* IO_MGR_NUM_BUFFERS; - - /// Number of bytes used by IO buffers (used and unused). - static const char* IO_MGR_TOTAL_BYTES; - - /// Number of IO buffers that are currently unused (and can be GC'ed) - static const char* IO_MGR_NUM_UNUSED_BUFFERS; - /// Total number of bytes read by the io mgr static const char* IO_MGR_BYTES_READ;