http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/io/request-context.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc index 031b976..b124702 100644 --- a/be/src/runtime/io/request-context.cc +++ b/be/src/runtime/io/request-context.cc @@ -36,30 +36,6 @@ BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, DCHECK_GE(buffer_len, 0); } -Status RequestContext::AllocBuffer(ScanRange* range, int64_t buffer_size, - unique_ptr<BufferDescriptor>* buffer_desc) { - DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER) - << static_cast<int>(range->external_buffer_tag_); - DCHECK_LE(buffer_size, parent_->max_buffer_size_); - DCHECK_GT(buffer_size, 0); - buffer_size = BitUtil::RoundUpToPowerOfTwo( - max(parent_->min_buffer_size_, min(parent_->max_buffer_size_, buffer_size))); - - DCHECK(mem_tracker_ != nullptr); - if (!mem_tracker_->TryConsume(buffer_size)) { - return mem_tracker_->MemLimitExceeded(nullptr, "disk I/O buffer", buffer_size); - } - - uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(buffer_size)); - if (buffer == nullptr) { - mem_tracker_->Release(buffer_size); - return Status(TErrorCode::INTERNAL_ERROR, - Substitute("Could not malloc buffer of $0 bytes")); - } - buffer_desc->reset(new BufferDescriptor(parent_, this, range, buffer, buffer_size)); - return Status::OK(); -} - void RequestContext::FreeBuffer(BufferDescriptor* buffer) { DCHECK(buffer->buffer_ != nullptr); if (!buffer->is_cached() && !buffer->is_client_buffer()) { @@ -108,40 +84,26 @@ void RequestContext::Cancel() { // The reader will be put into a cancelled state until call cleanup is complete. state_ = RequestContext::Cancelled; - // Cancel all scan ranges for this reader. Each range could be one one of - // four queues. - for (int i = 0; i < disk_states_.size(); ++i) { - PerDiskState& state = disk_states_[i]; - RequestRange* range = nullptr; - while ((range = state.in_flight_ranges()->Dequeue()) != nullptr) { - if (range->request_type() == RequestType::READ) { - static_cast<ScanRange*>(range)->Cancel(Status::CANCELLED); - } else { - DCHECK(range->request_type() == RequestType::WRITE); + // Clear out all request ranges from queues for this reader. Cancel the scan + // ranges and invoke the write range callbacks to propagate the cancellation. + for (ScanRange* range : active_scan_ranges_) range->CancelInternal(Status::CANCELLED); + active_scan_ranges_.clear(); + for (PerDiskState& disk_state : disk_states_) { + RequestRange* range; + while ((range = disk_state.in_flight_ranges()->Dequeue()) != nullptr) { + if (range->request_type() == RequestType::WRITE) { write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_); } } - - ScanRange* scan_range; - while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != nullptr) { - scan_range->Cancel(Status::CANCELLED); - } + while (disk_state.unstarted_scan_ranges()->Dequeue() != nullptr); WriteRange* write_range; - while ((write_range = state.unstarted_write_ranges()->Dequeue()) != nullptr) { + while ((write_range = disk_state.unstarted_write_ranges()->Dequeue()) != nullptr) { write_callbacks.push_back(write_range->callback_); } } - - ScanRange* range = nullptr; - while ((range = ready_to_start_ranges_.Dequeue()) != nullptr) { - range->Cancel(Status::CANCELLED); - } - while ((range = blocked_ranges_.Dequeue()) != nullptr) { - range->Cancel(Status::CANCELLED); - } - while ((range = cached_ranges_.Dequeue()) != nullptr) { - range->Cancel(Status::CANCELLED); - } + // Clear out the lists of scan ranges. + while (ready_to_start_ranges_.Dequeue() != nullptr); + while (cached_ranges_.Dequeue() != nullptr); // Ensure that the reader is scheduled on all disks (it may already be scheduled on // some). The disk threads will notice that the context is cancelled and do any @@ -170,9 +132,8 @@ void RequestContext::CancelAndMarkInactive() { // Wait until the ranges finish up. while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l); - // Validate that no buffers were leaked from this context. - DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString(); - DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString(); + // Validate that no ranges are active. + DCHECK_EQ(0, active_scan_ranges_.size()) << endl << DebugString(); // Validate that no threads are active and the context is not queued. for (const PerDiskState& disk_state : disk_states_) { @@ -185,42 +146,58 @@ void RequestContext::CancelAndMarkInactive() { state_ = Inactive; } -void RequestContext::AddRequestRange(const unique_lock<mutex>& lock, - RequestRange* range, bool schedule_immediately) { +void RequestContext::AddRangeToDisk(const unique_lock<mutex>& lock, + RequestRange* range, ScheduleMode schedule_mode) { DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); - PerDiskState& state = disk_states_[range->disk_id()]; - if (state.done()) { - DCHECK_EQ(state.num_remaining_ranges(), 0); - state.set_done(false); + DCHECK_EQ(state_, Active) << DebugString(); + PerDiskState* disk_state = &disk_states_[range->disk_id()]; + if (disk_state->done()) { + DCHECK_EQ(disk_state->num_remaining_ranges(), 0); + disk_state->set_done(false); ++num_disks_with_ranges_; } - - bool schedule_context; if (range->request_type() == RequestType::READ) { ScanRange* scan_range = static_cast<ScanRange*>(range); - if (schedule_immediately) { + if (schedule_mode == ScheduleMode::IMMEDIATELY) { ScheduleScanRange(lock, scan_range); - } else { - state.unstarted_scan_ranges()->Enqueue(scan_range); + } else if (schedule_mode == ScheduleMode::UPON_GETNEXT) { + disk_state->unstarted_scan_ranges()->Enqueue(scan_range); num_unstarted_scan_ranges_.Add(1); + // If there's no 'next_scan_range_to_start', schedule this RequestContext so that + // one of the 'unstarted_scan_ranges' will become the 'next_scan_range_to_start'. + if (disk_state->next_scan_range_to_start() == nullptr) { + disk_state->ScheduleContext(lock, this, range->disk_id()); + } } - // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will - // be set. If it's not NULL, this context will be scheduled when GetNextRange() is - // invoked. - schedule_context = state.next_scan_range_to_start() == NULL; } else { DCHECK(range->request_type() == RequestType::WRITE); - DCHECK(!schedule_immediately); + DCHECK(schedule_mode == ScheduleMode::IMMEDIATELY) << static_cast<int>(schedule_mode); WriteRange* write_range = static_cast<WriteRange*>(range); - state.unstarted_write_ranges()->Enqueue(write_range); + disk_state->unstarted_write_ranges()->Enqueue(write_range); - // ScheduleContext() has no effect if the context is already scheduled, - // so this is safe. - schedule_context = true; + // Ensure that the context is scheduled so that the write range gets picked up. + // ScheduleContext() has no effect if already scheduled, so this is safe to do always. + disk_state->ScheduleContext(lock, this, range->disk_id()); } + ++disk_state->num_remaining_ranges(); +} + +void RequestContext::AddActiveScanRangeLocked( + const unique_lock<mutex>& lock, ScanRange* range) { + DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); + DCHECK(state_ == Active); + active_scan_ranges_.insert(range); +} + +void RequestContext::RemoveActiveScanRange(ScanRange* range) { + unique_lock<mutex> lock(lock_); + RemoveActiveScanRangeLocked(lock, range); +} - if (schedule_context) state.ScheduleContext(lock, this, range->disk_id()); - ++state.num_remaining_ranges(); +void RequestContext::RemoveActiveScanRangeLocked( + const unique_lock<mutex>& lock, ScanRange* range) { + DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); + active_scan_ranges_.erase(range); } RequestContext::RequestContext( @@ -235,12 +212,9 @@ string RequestContext::DebugString() const { if (state_ == RequestContext::Cancelled) ss << "Cancelled"; if (state_ == RequestContext::Active) ss << "Active"; if (state_ != RequestContext::Inactive) { - ss << " #ready_buffers=" << num_ready_buffers_.Load() - << " #used_buffers=" << num_used_buffers_.Load() - << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load() - << " #finished_scan_ranges=" << num_finished_ranges_.Load() - << " #disk_with_ranges=" << num_disks_with_ranges_ - << " #disks=" << num_disks_with_ranges_; + ss << " #disk_with_ranges=" << num_disks_with_ranges_ + << " #disks=" << num_disks_with_ranges_ + << " #active scan ranges=" << active_scan_ranges_.size(); for (int i = 0; i < disk_states_.size(); ++i) { ss << endl << " " << i << ": " << "is_on_queue=" << disk_states_[i].is_on_queue() @@ -263,16 +237,6 @@ bool RequestContext::Validate() const { return false; } - if (num_used_buffers_.Load() < 0) { - LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_.Load(); - return false; - } - - if (num_ready_buffers_.Load() < 0) { - LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_.Load(); - return false; - } - int total_unstarted_ranges = 0; for (int i = 0; i < disk_states_.size(); ++i) { const PerDiskState& state = disk_states_[i]; @@ -350,8 +314,8 @@ bool RequestContext::Validate() const { LOG(WARNING) << "Reader cancelled but has ready to start ranges."; return false; } - if (!blocked_ranges_.empty()) { - LOG(WARNING) << "Reader cancelled but has blocked ranges."; + if (!active_scan_ranges_.empty()) { + LOG(WARNING) << "Reader cancelled but has active ranges."; return false; } }
http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/io/request-context.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h index 737a16c..3aea2bc 100644 --- a/be/src/runtime/io/request-context.h +++ b/be/src/runtime/io/request-context.h @@ -23,6 +23,11 @@ namespace impala { namespace io { + +// Mode argument for AddRangeToDisk(). +enum class ScheduleMode { + IMMEDIATELY, UPON_GETNEXT, BY_CALLER +}; /// A request context is used to group together I/O requests belonging to a client of the /// I/O manager for management and scheduling. /// @@ -32,53 +37,59 @@ namespace io { /// maintains state across all disks as well as per disk state. /// The unit for an IO request is a RequestRange, which may be a ScanRange or a /// WriteRange. -/// A scan range for the reader is on one of five states: -/// 1) PerDiskState's unstarted_ranges: This range has only been queued +/// A scan range for the reader is on one of six states: +/// 1) PerDiskState's 'unstarted_scan_ranges_': This range has only been queued /// and nothing has been read from it. -/// 2) RequestContext's ready_to_start_ranges_: This range is about to be started. -/// As soon as the reader picks it up, it will move to the in_flight_ranges +/// 2) RequestContext's 'ready_to_start_ranges_': This range is about to be started. +/// As soon as the reader picks it up, it will move to the 'in_flight_ranges_' /// queue. -/// 3) PerDiskState's in_flight_ranges: This range is being processed and will +/// 3) PerDiskState's 'in_flight_ranges_': This range is being processed and will /// be read from the next time a disk thread picks it up in GetNextRequestRange() -/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range -/// anymore. We need the caller to pull a buffer off which will put this in -/// the in_flight_ranges queue. These ranges are in the RequestContext's -/// blocked_ranges_ queue. -/// 5) ScanRange is cached and in the cached_ranges_ queue. -// -/// If the scan range is read and does not get blocked on the outgoing queue, the +/// 4) The ScanRange is blocked waiting for buffers because it does not have any unused +/// buffers to read data into. It is unblocked when a client adds new buffers via +/// AllocateBuffersForRange() or returns existing buffers via ReturnBuffer(). +/// ScanRanges in this state are identified by 'blocked_on_buffer_' == true. +/// 5) ScanRange is cached and in the 'cached_ranges_' queue. +/// 6) Inactive - either all the data for the range was returned or the range was +/// cancelled. I.e. ScanRange::eosr_ is true or ScanRange::cancel_status_ != OK. +/// +/// If the scan range is read and does not get blocked waiting for buffers, the /// transitions are: 1 -> 2 -> 3. /// If the scan range does get blocked, the transitions are /// 1 -> 2 -> 3 -> (4 -> 3)* -// -/// In the case of a cached scan range, the range is immediately put in cached_ranges_. +/// +/// In the case of a cached scan range, the range is immediately put in 'cached_ranges_'. /// When the caller asks for the next range to process, we first pull ranges from -/// the cache_ranges_ queue. If the range was cached, the range is removed and +/// the 'cache_ranges_' queue. If the range was cached, the range is removed and /// done (ranges are either entirely cached or not at all). If the cached read attempt /// fails, we put the range in state 1. -// -/// A write range for a context may be in one of two lists: -/// 1) unstarted_write_ranges_ : Ranges that have been queued but not processed. -/// 2) in_flight_ranges_: The write range is ready to be processed by the next disk thread -/// that picks it up in GetNextRequestRange(). -// +/// +/// All scan ranges in states 1-5 are tracked in 'active_scan_ranges_' so that they can be +/// cancelled when the RequestContext is cancelled. Scan ranges are removed from +/// 'active_scan_ranges_' during their transition to state 6. +/// +/// A write range for a context may be in one of two queues: +/// 1) 'unstarted_write_ranges_': Ranges that have been queued but not processed. +/// 2) 'in_flight_ranges_': The write range is ready to be processed by the next disk +/// thread that picks it up in GetNextRequestRange(). +/// /// AddWriteRange() adds WriteRanges for a disk. /// It is the responsibility of the client to pin the data to be written via a WriteRange /// in memory. After a WriteRange has been written, a callback is invoked to inform the /// client that the write has completed. -// +/// /// An important assumption is that write does not exceed the maximum read size and that /// the entire range is written when the write request is handled. (In other words, writes /// are not broken up.) -// +/// /// When a RequestContext is processed by a disk thread in GetNextRequestRange(), /// a write range is always removed from the list of unstarted write ranges and appended /// to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read -/// that is scheduled (by calling GetNextRange()) is always followed by a write (if one -/// exists). And since at most one WriteRange can be present in in_flight_ranges_ at any -/// time (once a write range is returned from GetNetxRequestRange() it is completed an -/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up -/// behind at most one write range. +/// that is scheduled (by calling GetNextUnstartedRange()) is always followed by a write +/// (if one exists). And since at most one WriteRange can be present in in_flight_ranges_ +/// at any time (once a write range is returned from GetNetxRequestRange() it is completed +/// and not re-enqueued), a scan range scheduled via a call to GetNextUnstartedRange() can +/// be queued up behind at most one write range. class RequestContext { public: ~RequestContext() { @@ -97,7 +108,6 @@ class RequestContext { return state_ == Cancelled; } - int64_t queue_size() const { return num_ready_buffers_.Load(); } int64_t bytes_read_local() const { return bytes_read_local_.Load(); } int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); } int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); } @@ -150,13 +160,6 @@ class RequestContext { RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker); - /// Allocates a buffer to read into with size between - /// max('buffer_size', 'min_buffer_size_') and 'max_buffer_size_'. - /// Does not acquire 'lock_'. - /// TODO: allocate using the buffer pool client associated with this reader. - Status AllocBuffer(ScanRange* range, int64_t buffer_size, - std::unique_ptr<BufferDescriptor>* buffer); - /// Cleans up a buffer. If the buffer was allocated with AllocBuffer(), frees the buffer /// memory and release the consumption to the client MemTracker. Otherwise (e.g. a /// client or HDFS cache buffer), just prepares the descriptor to be destroyed. @@ -184,11 +187,11 @@ class RequestContext { /// Adds range to in_flight_ranges, scheduling this reader on the disk threads /// if necessary. - /// 'lock_' must be held via 'lock' + /// 'lock_' must be held via 'lock'. Only valid to call if this context is active. void ScheduleScanRange(const boost::unique_lock<boost::mutex>& lock, ScanRange* range) { DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); DCHECK_EQ(state_, Active); - DCHECK(range != NULL); + DCHECK(range != nullptr); RequestContext::PerDiskState& state = disk_states_[range->disk_id()]; state.in_flight_ranges()->Enqueue(range); state.ScheduleContext(lock, this, range->disk_id()); @@ -198,11 +201,34 @@ class RequestContext { /// and mark the context as inactive, after which it cannot be used. void CancelAndMarkInactive(); - /// Adds request range to disk queue for this request context. Currently, - /// schedule_immediately must be false is RequestRange is a write range. - /// Caller must hold 'lock_' via 'lock'. - void AddRequestRange(const boost::unique_lock<boost::mutex>& lock, - RequestRange* range, bool schedule_immediately); + /// Adds a request range to the appropriate disk state. 'schedule_mode' controls which + /// queue the range is placed in. This RequestContext is scheduled on the disk state + /// if required by 'schedule_mode'. + /// + /// Write ranges must always have 'schedule_mode' IMMEDIATELY and are added to the + /// 'unstarted_write_ranges_' queue, from which they will be asynchronously moved to the + /// 'in_flight_ranges_' queue. + /// + /// Scan ranges can have different 'schedule_mode' values. If IMMEDIATELY, the range is + /// immediately added to the 'in_flight_ranges_' queue where it will be processed + /// asynchronously by disk threads. If UPON_GETNEXT, the range is added to the + /// 'unstarted_ranges_' queue, from which it can be returned to a client by + /// DiskIoMgr::GetNextUnstartedRange(). If BY_CALLER, the scan range is not added to + /// any queues. The range will be scheduled later as a separate step, e.g. when it is + /// unblocked by adding buffers to it. Caller must hold 'lock_' via 'lock'. + void AddRangeToDisk(const boost::unique_lock<boost::mutex>& lock, RequestRange* range, + ScheduleMode schedule_mode); + + /// Adds an active range to 'active_scan_ranges_' + void AddActiveScanRangeLocked( + const boost::unique_lock<boost::mutex>& lock, ScanRange* range); + + /// Removes the range from 'active_scan_ranges_'. Called by ScanRange after eos or + /// cancellation. If calling the Locked version, the caller must hold + /// 'lock_'. Otherwise the function will acquire 'lock_'. + void RemoveActiveScanRange(ScanRange* range); + void RemoveActiveScanRangeLocked( + const boost::unique_lock<boost::mutex>& lock, ScanRange* range); /// Validates invariants of reader. Reader lock must be taken beforehand. bool Validate() const; @@ -243,13 +269,6 @@ class RequestContext { /// Total number of bytes from remote reads that were expected to be local. AtomicInt64 unexpected_remote_bytes_{0}; - /// The number of buffers that have been returned to the reader (via GetNext()) that the - /// reader has not returned. Only included for debugging and diagnostics. - AtomicInt32 num_buffers_in_reader_{0}; - - /// The number of scan ranges that have been completed for this reader. - AtomicInt32 num_finished_ranges_{0}; - /// The number of scan ranges that required a remote read, updated at the end of each /// range scan. Only used for diagnostics. AtomicInt32 num_remote_ranges_{0}; @@ -264,17 +283,6 @@ class RequestContext { /// Total number of file handle opens where the file handle was not in the cache AtomicInt32 cached_file_handles_miss_count_{0}; - /// The number of buffers that are being used for this reader. This is the sum - /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about - /// to be queued). This includes both IOMgr-allocated buffers and client-provided - /// buffers. - AtomicInt32 num_used_buffers_{0}; - - /// The total number of ready buffers across all ranges. Ready buffers are buffers - /// that have been read from disk but not retrieved by the caller. - /// This is the sum of all queued buffers in all ranges for this reader context. - AtomicInt32 num_ready_buffers_{0}; - /// All fields below are accessed by multiple threads and the lock needs to be /// taken before accessing them. Must be acquired before ScanRange::lock_ if both /// are held simultaneously. @@ -283,6 +291,17 @@ class RequestContext { /// Current state of the reader State state_ = Active; + /// Scan ranges that have been added to the IO mgr for this context. Ranges can only + /// be added when 'state_' is Active. When this context is cancelled, Cancel() is + /// called for all the active ranges. If a client attempts to add a range while + /// 'state_' is Cancelled, the range is not added to this list and Status::CANCELLED + /// is returned to the client. This ensures that all active ranges are cancelled as a + /// result of RequestContext cancellation. + /// Ranges can be cancelled or hit eos non-atomically with their removal from this set, + /// so eos or cancelled ranges may be temporarily present here. Cancelling these ranges + /// a second time or cancelling after eos is safe and has no effect. + boost::unordered_set<ScanRange*> active_scan_ranges_; + /// The number of disks with scan ranges remaining (always equal to the sum of /// disks with ranges). int num_disks_with_ranges_ = 0; @@ -293,18 +312,15 @@ class RequestContext { InternalList<ScanRange> cached_ranges_; /// A list of ranges that should be returned in subsequent calls to - /// GetNextRange. + /// GetNextUnstartedRange(). /// There is a trade-off with when to populate this list. Populating it on - /// demand means consumers need to wait (happens in DiskIoMgr::GetNextRange()). + /// demand means consumers need to wait (happens in DiskIoMgr::GetNextUnstartedRange()). /// Populating it preemptively means we make worse scheduling decisions. /// We currently populate one range per disk. /// TODO: think about this some more. InternalList<ScanRange> ready_to_start_ranges_; ConditionVariable ready_to_start_ranges_cv_; // used with lock_ - /// Ranges that are blocked due to back pressure on outgoing buffers. - InternalList<ScanRange> blocked_ranges_; - /// Condition variable for UnregisterContext() to wait for all disks to complete ConditionVariable disks_complete_cond_var_; @@ -429,7 +445,7 @@ class RequestContext { /// Queue of pending IO requests for this disk in the order that they will be /// processed. A ScanRange is added to this queue when it is returned in - /// GetNextRange(), or when it is added with schedule_immediately = true. + /// GetNextUnstartedRange(), or when it is added with schedule_mode == IMMEDIATELY. /// A WriteRange is added to this queue from unstarted_write_ranges_ for each /// invocation of GetNextRequestRange() in WorkLoop(). /// The size of this queue is always less than or equal to num_remaining_ranges. @@ -458,7 +474,8 @@ class RequestContext { /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate between reads /// and writes. (Otherwise, since next_scan_range_to_start is set /// in GetNextRequestRange() whenever it is null, repeated calls to - /// GetNextRequestRange() and GetNextRange() may result in only reads being processed) + /// GetNextRequestRange() and GetNextUnstartedRange() may result in only reads being + /// processed) InternalQueue<WriteRange> unstarted_write_ranges_; }; http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/io/request-ranges.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h index ab0810a..041cb9d 100644 --- a/be/src/runtime/io/request-ranges.h +++ b/be/src/runtime/io/request-ranges.h @@ -26,8 +26,10 @@ #include "common/atomic.h" #include "common/hdfs.h" #include "common/status.h" +#include "runtime/bufferpool/buffer-pool.h" #include "util/condition-variable.h" #include "util/internal-queue.h" +#include "util/mem-range.h" namespace impala { class MemTracker; @@ -202,11 +204,11 @@ class ScanRange : public RequestRange { /// Resets this scan range object with the scan range description. The scan range /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the - /// local filesystem). The scan range must fall within the file bounds (offset >= 0 - /// and offset + len <= file_length). 'disk_id' is the disk queue to add the range - /// to. If 'expected_local' is true, a warning is generated if the read did not - /// come from a local disk. 'buffer_opts' specifies buffer management options - - /// see the DiskIoMgr class comment and the BufferOpts comments for details. + /// local filesystem). The scan range must be non-empty and fall within the file bounds + /// (len > 0 and offset >= 0 and offset + len <= file_length). 'disk_id' is the disk + /// queue to add the range to. If 'expected_local' is true, a warning is generated if + /// the read did not come from a local disk. 'buffer_opts' specifies buffer management + /// options - see the DiskIoMgr class comment and the BufferOpts comments for details. /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data. void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr); @@ -248,12 +250,11 @@ class ScanRange : public RequestRange { /// Initialize internal fields void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader); - /// Enqueues a buffer for this range. This does not block. - /// Returns true if this scan range has hit the queue capacity, false otherwise. + /// Enqueues a ready buffer with valid data for this range. This does not block. /// The caller passes ownership of buffer to the scan range and it is not /// valid to access buffer after this call. The reader lock must be held by the - /// caller. - bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock, + /// caller. Returns false if the scan range was cancelled. + bool EnqueueReadyBuffer(const boost::unique_lock<boost::mutex>& reader_lock, std::unique_ptr<BufferDescriptor> buffer); /// Validates the internal state of this range. lock_ must be taken @@ -304,16 +305,51 @@ class ScanRange : public RequestRange { Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock, bool* read_succeeded) WARN_UNUSED_RESULT; + /// Add buffers for the range to read data into and schedule the range if blocked. + /// If 'returned' is true, the buffers returned from GetNext() that are being recycled + /// via ReturnBuffer(). Otherwise the buffers are newly allocated buffers to be added. + void AddUnusedBuffers( + std::vector<std::unique_ptr<BufferDescriptor>>&& buffers, bool returned); + + /// Remove a buffer from 'unused_iomgr_buffers_' and update + /// 'unused_iomgr_buffer_bytes_'. If 'unused_iomgr_buffers_' is empty, return NULL. + /// 'lock_' must be held by the caller via 'scan_range_lock'. + std::unique_ptr<BufferDescriptor> GetUnusedBuffer( + const boost::unique_lock<boost::mutex>& scan_range_lock); + + /// Get the next buffer for this scan range for a disk thread to read into. Returns + /// the new buffer if successful. If no buffers are available, marks the range + /// as blocked and returns nullptr. Called must not hold 'lock_'. + std::unique_ptr<BufferDescriptor> GetNextUnusedBufferForRange(); + /// Cleans up a buffer that was not returned to the client. /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor. - /// This function will acquire 'lock_' and may acquire 'hdfs_lock_'. - void CleanUpBuffer(std::unique_ptr<BufferDescriptor> buffer); - - /// Same as CleanUpBuffer() except the caller must already hold 'lock_' via - /// 'scan_range_lock'. - void CleanUpBufferLocked(const boost::unique_lock<boost::mutex>& scan_range_lock, + /// The caller must hold 'lock_' via 'scan_range_lock'. + /// This function may acquire 'hdfs_lock_' + void CleanUpBuffer(const boost::unique_lock<boost::mutex>& scan_range_lock, std::unique_ptr<BufferDescriptor> buffer); + /// Same as CleanUpBuffer() except cleans up multiple buffers and caller must not + /// hold 'lock_'. + void CleanUpBuffers(std::vector<std::unique_ptr<BufferDescriptor>>&& buffers); + + /// Clean up all buffers in 'unused_iomgr_buffers_'. Only valid to call when the scan + /// range is cancelled or at eos. The caller must hold 'lock_' via 'scan_range_lock'. + void CleanUpUnusedBuffers(const boost::unique_lock<boost::mutex>& scan_range_lock); + + /// Same as Cancel() except reader_->lock must be held by the caller. + void CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock, + const Status& status); + + /// Same as Cancel() except doesn't remove the scan range from + /// reader_->active_scan_ranges_. This is invoked by RequestContext::Cancel(), + /// which removes the range itself to avoid invalidating its active_scan_ranges_ + /// iterator. + void CancelInternal(const Status& status); + + /// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'. + void SetBlockedOnBuffer(); + /// Returns true if no more buffers will be returned to clients in the future, /// either because of hitting eosr or cancellation. bool all_buffers_returned(const boost::unique_lock<boost::mutex>& lock) const { @@ -386,6 +422,10 @@ class ScanRange : public RequestRange { struct hadoopRzBuffer* cached_buffer_ = nullptr; }; + /// The number of buffers that have been returned to a client via GetNext() that have + /// not yet been returned with ReturnBuffer(). + AtomicInt32 num_buffers_in_reader_{0}; + /// Lock protecting fields below. /// This lock should not be taken during Open()/Read()/Close(). /// If RequestContext::lock_ and this lock need to be held simultaneously, @@ -395,18 +435,30 @@ class ScanRange : public RequestRange { /// Number of bytes read so far for this scan range int bytes_read_; - /// The number of buffers that have been returned to a client via GetNext() that have - /// not yet been returned with ReturnBuffer(). - int num_buffers_in_reader_ = 0; + /// Buffers to read into, used if the 'external_buffer_tag_' is NO_BUFFER. These are + /// initially populated when the client calls AllocateBuffersForRange() and + /// and are used to read scanned data into. Buffers are taken from this vector for + /// every read and added back, if needed, when the client calls ReturnBuffer(). + std::vector<std::unique_ptr<BufferDescriptor>> unused_iomgr_buffers_; + + /// Total number of bytes of buffers in 'unused_iomgr_buffers_'. + int64_t unused_iomgr_buffer_bytes_ = 0; + + /// Number of bytes of buffers returned from GetNextUnusedBufferForRange(). Used to + /// infer how many bytes of buffers need to be held onto to read the rest of the scan + /// range. + int64_t iomgr_buffer_bytes_returned_ = 0; /// If true, the last buffer for this scan range has been queued. /// If this is true and 'ready_buffers_' is empty, then no more buffers will be /// returned to the caller by this scan range. bool eosr_queued_ = false; - /// If true, this scan range has been removed from the reader's in_flight_ranges - /// queue because the ready_buffers_ queue is full. - bool blocked_on_queue_ = false; + /// If true, this scan range is not scheduled because a buffer is not available for + /// the next I/O in the range. This can happen when the scan range is initially created + /// or if the buffers added to the range have all been filled with data an not yet + /// returned. + bool blocked_on_buffer_ = false; /// IO buffers that are queued for this scan range. When Cancel() is called /// this is drained by the cancelling thread. I.e. this is always empty if http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/io/scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc index 1ffba00..0663a2b 100644 --- a/be/src/runtime/io/scan-range.cc +++ b/be/src/runtime/io/scan-range.cc @@ -42,7 +42,7 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u // any time and only one thread will remove from the queue. This is to guarantee // that buffers are queued and read in file order. -bool ScanRange::EnqueueBuffer( +bool ScanRange::EnqueueReadyBuffer( const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) { DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer"; @@ -52,19 +52,17 @@ bool ScanRange::EnqueueBuffer( DCHECK(!eosr_queued_); if (!cancel_status_.ok()) { // This range has been cancelled, no need to enqueue the buffer. - reader_->num_used_buffers_.Add(-1); - CleanUpBufferLocked(scan_range_lock, move(buffer)); + CleanUpBuffer(scan_range_lock, move(buffer)); return false; } - reader_->num_ready_buffers_.Add(1); + // Clean up any surplus buffers. E.g. we may have allocated too many if the file was + // shorter than expected. + if (buffer->eosr()) CleanUpUnusedBuffers(scan_range_lock); eosr_queued_ = buffer->eosr(); ready_buffers_.emplace_back(move(buffer)); - - DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT); - blocked_on_queue_ = ready_buffers_.size() == DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT; } buffer_ready_cv_.NotifyOne(); - return blocked_on_queue_; + return true; } Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { @@ -84,64 +82,97 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { // Remove the first ready buffer from the queue and return it DCHECK(!ready_buffers_.empty()); - DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT); *buffer = move(ready_buffers_.front()); ready_buffers_.pop_front(); eosr = (*buffer)->eosr(); + DCHECK(!eosr || unused_iomgr_buffers_.empty()) << DebugString(); } - // Update tracking counters. The buffer has now moved from the IoMgr to the - // caller. - reader_->num_ready_buffers_.Add(-1); - reader_->num_used_buffers_.Add(-1); - if (eosr) reader_->num_finished_ranges_.Add(1); - - unique_lock<mutex> reader_lock(reader_->lock_); + // Update tracking counters. The buffer has now moved from the IoMgr to the caller. + if (eosr) reader_->RemoveActiveScanRange(this); + num_buffers_in_reader_.Add(1); + return Status::OK(); +} - DCHECK(reader_->Validate()) << endl << reader_->DebugString(); - if (reader_->state_ == RequestContext::Cancelled) { - reader_->blocked_ranges_.Remove(this); - Cancel(Status::CANCELLED); - CleanUpBuffer(move(*buffer)); - return Status::CANCELLED; - } +void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) { + vector<unique_ptr<BufferDescriptor>> buffers; + buffers.emplace_back(move(buffer_desc)); + AddUnusedBuffers(move(buffers), true); +} - // At this point success is guaranteed so increment counters for returned buffers. - reader_->num_buffers_in_reader_.Add(1); +void ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers, + bool returned) { + DCHECK_GT(buffers.size(), 0); + /// Keep track of whether the range was unblocked in this function. If so, we need + /// to schedule it so it resumes progress. + bool unblocked = false; { - // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer() - // may have been called after we released 'lock_' above so we need to re-check - // whether the queue is full. unique_lock<mutex> scan_range_lock(lock_); - ++num_buffers_in_reader_; - if (blocked_on_queue_ - && ready_buffers_.size() < DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT - && !eosr_queued_) { - blocked_on_queue_ = false; - // This scan range was blocked and is no longer, add it to the reader - // queue again. - reader_->blocked_ranges_.Remove(this); + if (returned) { + // Buffers were in reader but now aren't. + num_buffers_in_reader_.Add(-buffers.size()); + } + + for (unique_ptr<BufferDescriptor>& buffer : buffers) { + // We should not hold onto the buffers in the following cases: + // 1. the scan range is using external buffers, e.g. cached buffers. + // 2. the scan range is cancelled + // 3. the scan range already hit eosr + // 4. we already have enough buffers to read the remainder of the scan range. + if (external_buffer_tag_ != ExternalBufferTag::NO_BUFFER + || !cancel_status_.ok() + || eosr_queued_ + || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_bytes_returned_) { + CleanUpBuffer(scan_range_lock, move(buffer)); + } else { + unused_iomgr_buffer_bytes_ += buffer->buffer_len(); + unused_iomgr_buffers_.emplace_back(move(buffer)); + if (blocked_on_buffer_) { + blocked_on_buffer_ = false; + unblocked = true; + } + } + } + } + // Must drop the ScanRange lock before acquiring the RequestContext lock. + if (unblocked) { + unique_lock<mutex> reader_lock(reader_->lock_); + // Reader may have been cancelled after we dropped 'scan_range_lock' above. + if (reader_->state_ == RequestContext::Cancelled) { + DCHECK(!cancel_status_.ok()); + } else { reader_->ScheduleScanRange(reader_lock, this); } } - return Status::OK(); } -void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) { - reader_->num_buffers_in_reader_.Add(-1); - { - unique_lock<mutex> scan_range_lock(lock_); - --num_buffers_in_reader_; - CleanUpBufferLocked(scan_range_lock, move(buffer_desc)); +unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer( + const unique_lock<mutex>& scan_range_lock) { + DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock()); + if (unused_iomgr_buffers_.empty()) return nullptr; + unique_ptr<BufferDescriptor> result = move(unused_iomgr_buffers_.back()); + unused_iomgr_buffers_.pop_back(); + unused_iomgr_buffer_bytes_ -= result->buffer_len(); + return result; +} + +unique_ptr<BufferDescriptor> ScanRange::GetNextUnusedBufferForRange() { + unique_lock<mutex> lock(lock_); + unique_ptr<BufferDescriptor> buffer_desc = GetUnusedBuffer(lock); + if (buffer_desc == nullptr) { + blocked_on_buffer_ = true; + } else { + iomgr_buffer_bytes_returned_ += buffer_desc->buffer_len(); } + return buffer_desc; } -void ScanRange::CleanUpBuffer(unique_ptr<BufferDescriptor> buffer_desc) { - unique_lock<mutex> scan_range_lock(lock_); - CleanUpBufferLocked(scan_range_lock, move(buffer_desc)); +void ScanRange::SetBlockedOnBuffer() { + unique_lock<mutex> lock(lock_); + blocked_on_buffer_ = true; } -void ScanRange::CleanUpBufferLocked( +void ScanRange::CleanUpBuffer( const boost::unique_lock<boost::mutex>& scan_range_lock, unique_ptr<BufferDescriptor> buffer_desc) { DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock()); @@ -149,7 +180,7 @@ void ScanRange::CleanUpBufferLocked( DCHECK_EQ(this, buffer_desc->scan_range_); buffer_desc->reader_->FreeBuffer(buffer_desc.get()); - if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_ == 0) { + if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_.Load() == 0) { // Close the scan range if there are no more buffers in the reader and no more buffers // will be returned to readers in future. Close() is idempotent so it is ok to call // multiple times during cleanup so long as the range is actually finished. @@ -157,10 +188,33 @@ void ScanRange::CleanUpBufferLocked( } } +void ScanRange::CleanUpBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers) { + unique_lock<mutex> lock(lock_); + for (unique_ptr<BufferDescriptor>& buffer : buffers) CleanUpBuffer(lock, move(buffer)); +} + +void ScanRange::CleanUpUnusedBuffers(const unique_lock<mutex>& scan_range_lock) { + while (!unused_iomgr_buffers_.empty()) { + CleanUpBuffer(scan_range_lock, GetUnusedBuffer(scan_range_lock)); + } +} + void ScanRange::Cancel(const Status& status) { // Cancelling a range that was never started, ignore. if (io_mgr_ == nullptr) return; + CancelInternal(status); + reader_->RemoveActiveScanRange(this); +} + +void ScanRange::CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock, + const Status& status) { + DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); + CancelInternal(status); + reader_->RemoveActiveScanRangeLocked(reader_lock, this); +} +void ScanRange::CancelInternal(const Status& status) { + DCHECK(io_mgr_ != nullptr); DCHECK(!status.ok()); { // Grab both locks to make sure that we don't change 'cancel_status_' while other @@ -177,12 +231,13 @@ void ScanRange::Cancel(const Status& status) { /// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads /// from seeing inconsistent state. - reader_->num_used_buffers_.Add(-ready_buffers_.size()); - reader_->num_ready_buffers_.Add(-ready_buffers_.size()); while (!ready_buffers_.empty()) { - CleanUpBufferLocked(scan_range_lock, move(ready_buffers_.front())); + CleanUpBuffer(scan_range_lock, move(ready_buffers_.front())); ready_buffers_.pop_front(); } + + /// Clean up buffers that we don't need any more because we won't read any more data. + CleanUpUnusedBuffers(scan_range_lock); } buffer_ready_cv_.NotifyAll(); @@ -197,6 +252,10 @@ string ScanRange::DebugString() const { << " len=" << len_ << " bytes_read=" << bytes_read_ << " cancel_status=" << cancel_status_.GetDetail() << " buffer_queue=" << ready_buffers_.size() + << " num_buffers_in_readers=" << num_buffers_in_reader_.Load() + << " unused_iomgr_buffers=" << unused_iomgr_buffers_.size() + << " unused_iomgr_buffer_bytes=" << unused_iomgr_buffer_bytes_ + << " blocked_on_buffer=" << blocked_on_buffer_ << " hdfs_file=" << exclusive_hdfs_fh_; return ss.str(); } @@ -211,6 +270,27 @@ bool ScanRange::Validate() { LOG(ERROR) << "Cancelled range should not have queued buffers " << DebugString(); return false; } + int64_t unused_iomgr_buffer_bytes = 0; + for (auto& buffer : unused_iomgr_buffers_) + unused_iomgr_buffer_bytes += buffer->buffer_len(); + if (unused_iomgr_buffer_bytes != unused_iomgr_buffer_bytes_) { + LOG(ERROR) << "unused_iomgr_buffer_bytes_ incorrect actual: " + << unused_iomgr_buffer_bytes_ + << " vs. expected: " << unused_iomgr_buffer_bytes; + return false; + } + bool is_finished = !cancel_status_.ok() || eosr_queued_; + if (is_finished && !unused_iomgr_buffers_.empty()) { + LOG(ERROR) << "Held onto too many buffers " << unused_iomgr_buffers_.size() + << " bytes: " << unused_iomgr_buffer_bytes_ + << " cancel_status: " << cancel_status_.GetDetail() + << " eosr_queued: " << eosr_queued_; + return false; + } + if (!is_finished && blocked_on_buffer_ && !unused_iomgr_buffers_.empty()) { + LOG(ERROR) << "Blocked despite having buffers: " << DebugString(); + return false; + } return true; } @@ -224,7 +304,7 @@ ScanRange::~ScanRange() { DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) << "Cached buffer was not released."; DCHECK_EQ(0, ready_buffers_.size()); - DCHECK_EQ(0, num_buffers_in_reader_); + DCHECK_EQ(0, num_buffers_in_reader_.Load()); } void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, @@ -268,9 +348,11 @@ void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) { local_file_ = nullptr; exclusive_hdfs_fh_ = nullptr; bytes_read_ = 0; + unused_iomgr_buffer_bytes_ = 0; + iomgr_buffer_bytes_returned_ = 0; cancel_status_ = Status::OK(); - eosr_queued_= false; - blocked_on_queue_ = false; + eosr_queued_ = false; + blocked_on_buffer_ = false; DCHECK(Validate()) << DebugString(); } @@ -316,9 +398,7 @@ Status ScanRange::Open(bool use_file_handle_cache) { "for file: $1: $2", offset_, file_, GetStrErrMsg())); } } - if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) { - ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L); - } + ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L); return Status::OK(); } @@ -370,9 +450,7 @@ void ScanRange::Close() { local_file_ = nullptr; closed_file = true; } - if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) { - ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L); - } + if (closed_file) ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L); } int64_t ScanRange::MaxReadChunkSize() const { @@ -580,12 +658,9 @@ Status ScanRange::ReadFromCache( desc->scan_range_offset_ = 0; desc->eosr_ = true; bytes_read_ = bytes_read; - EnqueueBuffer(reader_lock, move(desc)); - if (reader_->bytes_read_counter_ != nullptr) { - COUNTER_ADD(reader_->bytes_read_counter_, bytes_read); - } + EnqueueReadyBuffer(reader_lock, move(desc)); + COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read); *read_succeeded = true; - reader_->num_used_buffers_.Add(1); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 7b00179..3a69c33 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -396,7 +396,10 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) { BufferOpts::ReadInto(buffer.data(), buffer.len())); read_counter_->Add(1); bytes_read_counter_->Add(buffer.len()); - RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, true)); + bool needs_buffers; + RETURN_IF_ERROR(io_mgr_->StartScanRange( + io_ctx_.get(), handle->read_range_, &needs_buffers)); + DCHECK(!needs_buffers) << "Already provided a buffer"; return Status::OK(); } @@ -522,11 +525,20 @@ TmpFileMgr::WriteHandle::WriteHandle( is_cancelled_(false), write_in_flight_(false) {} +TmpFileMgr::WriteHandle::~WriteHandle() { + DCHECK(!write_in_flight_); + DCHECK(read_range_ == nullptr); +} + string TmpFileMgr::WriteHandle::TmpFilePath() const { if (file_ == nullptr) return ""; return file_->path(); } +int64_t TmpFileMgr::WriteHandle::len() const { + return write_range_->len(); +} + Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, RequestContext* io_ctx, File* file, int64_t offset, MemRange buffer, WriteRange::WriteDoneCallback callback) { http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index 95072ae..55901f4 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -28,7 +28,6 @@ #include "common/object-pool.h" #include "common/status.h" #include "gen-cpp/Types_types.h" // for TUniqueId -#include "runtime/io/request-ranges.h" #include "util/collection-metrics.h" #include "util/condition-variable.h" #include "util/mem-range.h" @@ -37,6 +36,12 @@ #include "util/spinlock.h" namespace impala { +namespace io { + class DiskIoMgr; + class RequestContext; + class ScanRange; + class WriteRange; +} /// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files /// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch @@ -84,6 +89,7 @@ class TmpFileMgr { /// Needs to be public for TmpFileMgrTest. typedef int DeviceId; + /// Same typedef as io::WriteRange::WriteDoneCallback. typedef std::function<void(const Status&)> WriteDoneCallback; /// Represents a group of temporary files - one per disk with a scratch directory. The @@ -277,10 +283,7 @@ class TmpFileMgr { public: /// The write must be destroyed by passing it to FileGroup - destroying it before /// the write completes is an error. - ~WriteHandle() { - DCHECK(!write_in_flight_); - DCHECK(read_range_ == nullptr); - } + ~WriteHandle(); /// Cancel any in-flight read synchronously. void CancelRead(); @@ -290,7 +293,7 @@ class TmpFileMgr { std::string TmpFilePath() const; /// The length of the write range in bytes. - int64_t len() const { return write_range_->len(); } + int64_t len() const; std::string DebugString(); @@ -305,7 +308,7 @@ class TmpFileMgr { /// failure and 'is_cancelled_' is set to true on failure. Status Write(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file, int64_t offset, MemRange buffer, - io::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT; + WriteDoneCallback callback) WARN_UNUSED_RESULT; /// Retry the write after the initial write failed with an error, instead writing to /// 'offset' of 'file'. 'write_in_flight_' must be true before calling. http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/util/bit-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc index 5f8d443..6f70727 100644 --- a/be/src/util/bit-util-test.cc +++ b/be/src/util/bit-util-test.cc @@ -258,6 +258,17 @@ TEST(BitUtil, Log2) { EXPECT_EQ(BitUtil::Log2CeilingNonZero64(ULLONG_MAX), 64); } +TEST(BitUtil, RoundToPowerOfTwo) { + EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(9)); + EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(15)); + EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(16)); + EXPECT_EQ(32, BitUtil::RoundUpToPowerOfTwo(17)); + EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(9)); + EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(15)); + EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(16)); + EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(17)); +} + TEST(BitUtil, RoundUpToPowerOf2) { EXPECT_EQ(BitUtil::RoundUpToPowerOf2(7, 8), 8); EXPECT_EQ(BitUtil::RoundUpToPowerOf2(8, 8), 8); http://git-wip-us.apache.org/repos/asf/impala/blob/5699b59d/be/src/util/bit-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h index 5c9a29b..8a65509 100644 --- a/be/src/util/bit-util.h +++ b/be/src/util/bit-util.h @@ -98,6 +98,12 @@ class BitUtil { return v; } + /// Returns the largest power of two <= v. + static inline int64_t RoundDownToPowerOfTwo(int64_t v) { + int64_t v_rounded_up = RoundUpToPowerOfTwo(v); + return v_rounded_up == v ? v : v_rounded_up / 2; + } + /// Returns 'value' rounded up to the nearest multiple of 'factor' when factor is /// a power of two static inline int64_t RoundUpToPowerOf2(int64_t value, int64_t factor) { @@ -105,7 +111,7 @@ class BitUtil { return (value + (factor - 1)) & ~(factor - 1); } - static inline int RoundDownToPowerOf2(int value, int factor) { + static inline int64_t RoundDownToPowerOf2(int64_t value, int64_t factor) { DCHECK((factor > 0) && ((factor & (factor - 1)) == 0)); return value & ~(factor - 1); }