http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/handle-cache.inline.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/handle-cache.inline.h b/be/src/runtime/io/handle-cache.inline.h new file mode 100644 index 0000000..10db49e --- /dev/null +++ b/be/src/runtime/io/handle-cache.inline.h @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <tuple> + +#include "runtime/io/handle-cache.h" +#include "util/hash-util.h" +#include "util/time.h" + +#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H +#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H + +namespace impala { +namespace io { + +HdfsFileHandle::HdfsFileHandle(const hdfsFS& fs, const char* fname, + int64_t mtime) + : fs_(fs), hdfs_file_(hdfsOpenFile(fs, fname, O_RDONLY, 0, 0, 0)), mtime_(mtime) { + ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(1L); + VLOG_FILE << "hdfsOpenFile() file=" << fname << " fid=" << hdfs_file_; +} + +HdfsFileHandle::~HdfsFileHandle() { + if (hdfs_file_ != nullptr && fs_ != nullptr) { + ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L); + VLOG_FILE << "hdfsCloseFile() fid=" << hdfs_file_; + hdfsCloseFile(fs_, hdfs_file_); + } + fs_ = nullptr; + hdfs_file_ = nullptr; +} + +template <size_t NUM_PARTITIONS> + FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity, + uint64_t unused_handle_timeout_secs) + : unused_handle_timeout_secs_(unused_handle_timeout_secs) { + DCHECK_GT(NUM_PARTITIONS, 0); + size_t remainder = capacity % NUM_PARTITIONS; + size_t base_capacity = capacity / NUM_PARTITIONS; + size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity); + for (FileHandleCachePartition& p : cache_partitions_) { + p.size = 0; + p.capacity = partition_capacity; + } +} + +template <size_t NUM_PARTITIONS> +FileHandleCache<NUM_PARTITIONS>::LruListEntry::LruListEntry( + typename MapType::iterator map_entry_in) + : map_entry(map_entry_in), timestamp_seconds(MonotonicSeconds()) {} + +template <size_t NUM_PARTITIONS> +FileHandleCache<NUM_PARTITIONS>::~FileHandleCache() { + shut_down_promise_.Set(true); + if (eviction_thread_ != nullptr) eviction_thread_->Join(); +} + +template <size_t NUM_PARTITIONS> +Status FileHandleCache<NUM_PARTITIONS>::Init() { + return Thread::Create("disk-io-mgr-handle-cache", "File Handle Timeout", + &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this, &eviction_thread_); +} + +template <size_t NUM_PARTITIONS> +HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle( + const hdfsFS& fs, std::string* fname, int64_t mtime, bool require_new_handle, + bool* cache_hit) { + // Hash the key and get appropriate partition + int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS; + FileHandleCachePartition& p = cache_partitions_[index]; + boost::lock_guard<SpinLock> g(p.lock); + pair<typename MapType::iterator, typename MapType::iterator> range = + p.cache.equal_range(*fname); + + // If this requires a new handle, skip to the creation codepath. Otherwise, + // find an unused entry with the same mtime + FileHandleEntry* ret_elem = nullptr; + if (!require_new_handle) { + while (range.first != range.second) { + FileHandleEntry* elem = &range.first->second; + if (!elem->in_use && elem->fh->mtime() == mtime) { + // This element is currently in the lru_list, which means that lru_entry must + // be an iterator pointing into the lru_list. + DCHECK(elem->lru_entry != p.lru_list.end()); + // Remove the element from the lru_list and designate that it is not on + // the lru_list by resetting its iterator to point to the end of the list. + p.lru_list.erase(elem->lru_entry); + elem->lru_entry = p.lru_list.end(); + ret_elem = elem; + *cache_hit = true; + break; + } + ++range.first; + } + } + + // There was no entry that was free or caller asked for a new handle + if (!ret_elem) { + *cache_hit = false; + // Create a new entry and move it into the map + HdfsFileHandle* new_fh = new HdfsFileHandle(fs, fname->data(), mtime); + if (!new_fh->ok()) { + delete new_fh; + return nullptr; + } + FileHandleEntry entry(new_fh, p.lru_list); + typename MapType::iterator new_it = p.cache.emplace_hint(range.second, + *fname, std::move(entry)); + ret_elem = &new_it->second; + ++p.size; + if (p.size > p.capacity) EvictHandles(p); + } + + DCHECK(ret_elem->fh.get() != nullptr); + DCHECK(!ret_elem->in_use); + ret_elem->in_use = true; + ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L); + return ret_elem->fh.get(); +} + +template <size_t NUM_PARTITIONS> +void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname, + HdfsFileHandle* fh, bool destroy_handle) { + DCHECK(fh != nullptr); + // Hash the key and get appropriate partition + int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS; + FileHandleCachePartition& p = cache_partitions_[index]; + boost::lock_guard<SpinLock> g(p.lock); + pair<typename MapType::iterator, typename MapType::iterator> range = + p.cache.equal_range(*fname); + + // TODO: This can be optimized by maintaining some state in the file handle about + // its location in the map. + typename MapType::iterator release_it = range.first; + while (release_it != range.second) { + FileHandleEntry* elem = &release_it->second; + if (elem->fh.get() == fh) break; + ++release_it; + } + DCHECK(release_it != range.second); + + // This file handle is no longer referenced + FileHandleEntry* release_elem = &release_it->second; + DCHECK(release_elem->in_use); + release_elem->in_use = false; + ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L); + if (destroy_handle) { + --p.size; + p.cache.erase(release_it); + return; + } + // Hdfs can use some memory for readahead buffering. Calling unbuffer reduces + // this buffering so that the file handle takes up less memory when in the cache. + // If unbuffering is not supported, then hdfsUnbufferFile() will return a non-zero + // return code, and we close the file handle and remove it from the cache. + if (hdfsUnbufferFile(release_elem->fh->file()) == 0) { + // This FileHandleEntry must not be in the lru list already, because it was + // in use. Verify this by checking that the lru_entry is pointing to the end, + // which cannot be true for any element in the lru list. + DCHECK(release_elem->lru_entry == p.lru_list.end()); + // Add this to the lru list, establishing links in both directions. + // The FileHandleEntry has an iterator to the LruListEntry and the + // LruListEntry has an iterator to the location of the FileHandleEntry in + // the cache. + release_elem->lru_entry = p.lru_list.emplace(p.lru_list.end(), release_it); + if (p.size > p.capacity) EvictHandles(p); + } else { + VLOG_FILE << "FS does not support file handle unbuffering, closing file=" + << fname; + --p.size; + p.cache.erase(release_it); + } +} + +template <size_t NUM_PARTITIONS> +void FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop() { + while (true) { + for (FileHandleCachePartition& p : cache_partitions_) { + boost::lock_guard<SpinLock> g(p.lock); + EvictHandles(p); + } + // This Get() will time out until shutdown, when the promise is set. + bool timed_out; + shut_down_promise_.Get(EVICT_HANDLES_PERIOD_MS, &timed_out); + if (!timed_out) break; + } + // The promise must be set to true. + DCHECK(shut_down_promise_.IsSet()); + DCHECK(shut_down_promise_.Get()); +} + +template <size_t NUM_PARTITIONS> +void FileHandleCache<NUM_PARTITIONS>::EvictHandles( + FileHandleCache<NUM_PARTITIONS>::FileHandleCachePartition& p) { + uint64_t now = MonotonicSeconds(); + uint64_t oldest_allowed_timestamp = + now > unused_handle_timeout_secs_ ? now - unused_handle_timeout_secs_ : 0; + while (p.lru_list.size() > 0) { + // Peek at the oldest element + LruListEntry oldest_entry = p.lru_list.front(); + typename MapType::iterator oldest_entry_map_it = oldest_entry.map_entry; + uint64_t oldest_entry_timestamp = oldest_entry.timestamp_seconds; + // If the oldest element does not need to be aged out and the cache is not over + // capacity, then we are done and there is nothing to evict. + if (p.size <= p.capacity && (unused_handle_timeout_secs_ == 0 || + oldest_entry_timestamp >= oldest_allowed_timestamp)) { + return; + } + // Evict the oldest element + DCHECK(!oldest_entry_map_it->second.in_use); + p.cache.erase(oldest_entry_map_it); + p.lru_list.pop_front(); + --p.size; + } +} +} +} +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/request-context.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc new file mode 100644 index 0000000..287f53a --- /dev/null +++ b/be/src/runtime/io/request-context.cc @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/io/disk-io-mgr-internal.h" + +#include "common/names.h" + +using namespace impala; +using namespace impala::io; + +void RequestContext::Cancel(const Status& status) { + DCHECK(!status.ok()); + + // Callbacks are collected in this vector and invoked while no lock is held. + vector<WriteRange::WriteDoneCallback> write_callbacks; + { + lock_guard<mutex> lock(lock_); + DCHECK(Validate()) << endl << DebugString(); + + // Already being cancelled + if (state_ == RequestContext::Cancelled) return; + + DCHECK(status_.ok()); + status_ = status; + + // The reader will be put into a cancelled state until call cleanup is complete. + state_ = RequestContext::Cancelled; + + // Cancel all scan ranges for this reader. Each range could be one one of + // four queues. + for (int i = 0; i < disk_states_.size(); ++i) { + RequestContext::PerDiskState& state = disk_states_[i]; + RequestRange* range = NULL; + while ((range = state.in_flight_ranges()->Dequeue()) != NULL) { + if (range->request_type() == RequestType::READ) { + static_cast<ScanRange*>(range)->Cancel(status); + } else { + DCHECK(range->request_type() == RequestType::WRITE); + write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_); + } + } + + ScanRange* scan_range; + while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) { + scan_range->Cancel(status); + } + WriteRange* write_range; + while ((write_range = state.unstarted_write_ranges()->Dequeue()) != NULL) { + write_callbacks.push_back(write_range->callback_); + } + } + + ScanRange* range = NULL; + while ((range = ready_to_start_ranges_.Dequeue()) != NULL) { + range->Cancel(status); + } + while ((range = blocked_ranges_.Dequeue()) != NULL) { + range->Cancel(status); + } + while ((range = cached_ranges_.Dequeue()) != NULL) { + range->Cancel(status); + } + + // Schedule reader on all disks. The disks will notice it is cancelled and do any + // required cleanup + for (int i = 0; i < disk_states_.size(); ++i) { + RequestContext::PerDiskState& state = disk_states_[i]; + state.ScheduleContext(this, i); + } + } + + for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) { + write_callback(status_); + } + + // Signal reader and unblock the GetNext/Read thread. That read will fail with + // a cancelled status. + ready_to_start_ranges_cv_.NotifyAll(); +} + +void RequestContext::CancelAndMarkInactive() { + Cancel(Status::CANCELLED); + + boost::unique_lock<boost::mutex> l(lock_); + DCHECK_NE(state_, Inactive); + DCHECK(Validate()) << endl << DebugString(); + + // Wait until the ranges finish up. + while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l); + + // Validate that no buffers were leaked from this context. + DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString(); + DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString(); + DCHECK(Validate()) << endl << DebugString(); + state_ = Inactive; +} + +void RequestContext::AddRequestRange( + RequestRange* range, bool schedule_immediately) { + // DCHECK(lock_.is_locked()); // TODO: boost should have this API + RequestContext::PerDiskState& state = disk_states_[range->disk_id()]; + if (state.done()) { + DCHECK_EQ(state.num_remaining_ranges(), 0); + state.set_done(false); + ++num_disks_with_ranges_; + } + + bool schedule_context; + if (range->request_type() == RequestType::READ) { + ScanRange* scan_range = static_cast<ScanRange*>(range); + if (schedule_immediately) { + ScheduleScanRange(scan_range); + } else { + state.unstarted_scan_ranges()->Enqueue(scan_range); + num_unstarted_scan_ranges_.Add(1); + } + // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will + // be set. If it's not NULL, this context will be scheduled when GetNextRange() is + // invoked. + schedule_context = state.next_scan_range_to_start() == NULL; + } else { + DCHECK(range->request_type() == RequestType::WRITE); + DCHECK(!schedule_immediately); + WriteRange* write_range = static_cast<WriteRange*>(range); + state.unstarted_write_ranges()->Enqueue(write_range); + + // ScheduleContext() has no effect if the context is already scheduled, + // so this is safe. + schedule_context = true; + } + + if (schedule_context) state.ScheduleContext(this, range->disk_id()); + ++state.num_remaining_ranges(); +} + +RequestContext::RequestContext( + DiskIoMgr* parent, int num_disks, MemTracker* tracker) + : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {} + +// Dumps out request context information. Lock should be taken by caller +string RequestContext::DebugString() const { + stringstream ss; + ss << endl << " RequestContext: " << (void*)this << " (state="; + if (state_ == RequestContext::Inactive) ss << "Inactive"; + if (state_ == RequestContext::Cancelled) ss << "Cancelled"; + if (state_ == RequestContext::Active) ss << "Active"; + if (state_ != RequestContext::Inactive) { + ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail()) + << " #ready_buffers=" << num_ready_buffers_.Load() + << " #used_buffers=" << num_used_buffers_.Load() + << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load() + << " #finished_scan_ranges=" << num_finished_ranges_.Load() + << " #disk_with_ranges=" << num_disks_with_ranges_ + << " #disks=" << num_disks_with_ranges_; + for (int i = 0; i < disk_states_.size(); ++i) { + ss << endl << " " << i << ": " + << "is_on_queue=" << disk_states_[i].is_on_queue() + << " done=" << disk_states_[i].done() + << " #num_remaining_scan_ranges=" << disk_states_[i].num_remaining_ranges() + << " #in_flight_ranges=" << disk_states_[i].in_flight_ranges()->size() + << " #unstarted_scan_ranges=" << disk_states_[i].unstarted_scan_ranges()->size() + << " #unstarted_write_ranges=" + << disk_states_[i].unstarted_write_ranges()->size() + << " #reading_threads=" << disk_states_[i].num_threads_in_op(); + } + } + ss << ")"; + return ss.str(); +} + +bool RequestContext::Validate() const { + if (state_ == RequestContext::Inactive) { + LOG(WARNING) << "state_ == RequestContext::Inactive"; + return false; + } + + if (num_used_buffers_.Load() < 0) { + LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_.Load(); + return false; + } + + if (num_ready_buffers_.Load() < 0) { + LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_.Load(); + return false; + } + + int total_unstarted_ranges = 0; + for (int i = 0; i < disk_states_.size(); ++i) { + const PerDiskState& state = disk_states_[i]; + bool on_queue = state.is_on_queue(); + int num_reading_threads = state.num_threads_in_op(); + + total_unstarted_ranges += state.unstarted_scan_ranges()->size(); + + if (num_reading_threads < 0) { + LOG(WARNING) << "disk_id=" << i + << "state.num_threads_in_read < 0: #threads=" + << num_reading_threads; + return false; + } + + if (state_ != RequestContext::Cancelled) { + if (state.unstarted_scan_ranges()->size() + state.in_flight_ranges()->size() > + state.num_remaining_ranges()) { + LOG(WARNING) << "disk_id=" << i + << " state.unstarted_ranges.size() + state.in_flight_ranges.size()" + << " > state.num_remaining_ranges:" + << " #unscheduled=" << state.unstarted_scan_ranges()->size() + << " #in_flight=" << state.in_flight_ranges()->size() + << " #remaining=" << state.num_remaining_ranges(); + return false; + } + + // If we have an in_flight range, the reader must be on the queue or have a + // thread actively reading for it. + if (!state.in_flight_ranges()->empty() && !on_queue && num_reading_threads == 0) { + LOG(WARNING) << "disk_id=" << i + << " reader has inflight ranges but is not on the disk queue." + << " #in_flight_ranges=" << state.in_flight_ranges()->size() + << " #reading_threads=" << num_reading_threads + << " on_queue=" << on_queue; + return false; + } + + if (state.done() && num_reading_threads > 0) { + LOG(WARNING) << "disk_id=" << i + << " state set to done but there are still threads working." + << " #reading_threads=" << num_reading_threads; + return false; + } + } else { + // Is Cancelled + if (!state.in_flight_ranges()->empty()) { + LOG(WARNING) << "disk_id=" << i + << "Reader cancelled but has in flight ranges."; + return false; + } + if (!state.unstarted_scan_ranges()->empty()) { + LOG(WARNING) << "disk_id=" << i + << "Reader cancelled but has unstarted ranges."; + return false; + } + } + + if (state.done() && on_queue) { + LOG(WARNING) << "disk_id=" << i + << " state set to done but the reader is still on the disk queue." + << " state.done=true and state.is_on_queue=true"; + return false; + } + } + + if (state_ != RequestContext::Cancelled) { + if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) { + LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges + << " sum_in_states=" << num_unstarted_scan_ranges_.Load(); + return false; + } + } else { + if (!ready_to_start_ranges_.empty()) { + LOG(WARNING) << "Reader cancelled but has ready to start ranges."; + return false; + } + if (!blocked_ranges_.empty()) { + LOG(WARNING) << "Reader cancelled but has blocked ranges."; + return false; + } + } + + return true; +} + +void RequestContext::PerDiskState::ScheduleContext( + RequestContext* context, int disk_id) { + if (!is_on_queue_ && !done_) { + is_on_queue_ = true; + context->parent_->disk_queues_[disk_id]->EnqueueContext(context); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/request-context.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h new file mode 100644 index 0000000..9807805 --- /dev/null +++ b/be/src/runtime/io/request-context.h @@ -0,0 +1,403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_IO_REQUEST_CONTEXT_H +#define IMPALA_RUNTIME_IO_REQUEST_CONTEXT_H + +#include "runtime/io/disk-io-mgr.h" +#include "util/condition-variable.h" + +namespace impala { +namespace io { +/// A request context is used to group together I/O requests belonging to a client of the +/// I/O manager for management and scheduling. For most I/O manager clients it is an +/// opaque pointer, but some clients may need to include this header, e.g. to make the +/// unique_ptr<DiskIoRequestContext> destructor work correctly. +/// +/// Implementation Details +/// ====================== +/// This object maintains a lot of state that is carefully synchronized. The context +/// maintains state across all disks as well as per disk state. +/// The unit for an IO request is a RequestRange, which may be a ScanRange or a +/// WriteRange. +/// A scan range for the reader is on one of five states: +/// 1) PerDiskState's unstarted_ranges: This range has only been queued +/// and nothing has been read from it. +/// 2) RequestContext's ready_to_start_ranges_: This range is about to be started. +/// As soon as the reader picks it up, it will move to the in_flight_ranges +/// queue. +/// 3) PerDiskState's in_flight_ranges: This range is being processed and will +/// be read from the next time a disk thread picks it up in GetNextRequestRange() +/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range +/// anymore. We need the caller to pull a buffer off which will put this in +/// the in_flight_ranges queue. These ranges are in the RequestContext's +/// blocked_ranges_ queue. +/// 5) ScanRange is cached and in the cached_ranges_ queue. +// +/// If the scan range is read and does not get blocked on the outgoing queue, the +/// transitions are: 1 -> 2 -> 3. +/// If the scan range does get blocked, the transitions are +/// 1 -> 2 -> 3 -> (4 -> 3)* +// +/// In the case of a cached scan range, the range is immediately put in cached_ranges_. +/// When the caller asks for the next range to process, we first pull ranges from +/// the cache_ranges_ queue. If the range was cached, the range is removed and +/// done (ranges are either entirely cached or not at all). If the cached read attempt +/// fails, we put the range in state 1. +// +/// A write range for a context may be in one of two lists: +/// 1) unstarted_write_ranges_ : Ranges that have been queued but not processed. +/// 2) in_flight_ranges_: The write range is ready to be processed by the next disk thread +/// that picks it up in GetNextRequestRange(). +// +/// AddWriteRange() adds WriteRanges for a disk. +/// It is the responsibility of the client to pin the data to be written via a WriteRange +/// in memory. After a WriteRange has been written, a callback is invoked to inform the +/// client that the write has completed. +// +/// An important assumption is that write does not exceed the maximum read size and that +/// the entire range is written when the write request is handled. (In other words, writes +/// are not broken up.) +// +/// When a RequestContext is processed by a disk thread in GetNextRequestRange(), +/// a write range is always removed from the list of unstarted write ranges and appended +/// to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read +/// that is scheduled (by calling GetNextRange()) is always followed by a write (if one +/// exists). And since at most one WriteRange can be present in in_flight_ranges_ at any +/// time (once a write range is returned from GetNetxRequestRange() it is completed an +/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up +/// behind at most one write range. +class RequestContext { + public: + ~RequestContext() { DCHECK_EQ(state_, Inactive) << "Must be unregistered."; } + + private: + DISALLOW_COPY_AND_ASSIGN(RequestContext); + friend class DiskIoMgr; + friend class ScanRange; + + class PerDiskState; + + enum State { + /// Reader is initialized and maps to a client + Active, + + /// Reader is in the process of being cancelled. Cancellation is coordinated between + /// different threads and when they are all complete, the reader context is moved to + /// the inactive state. + Cancelled, + + /// Reader context does not map to a client. Accessing memory in this context + /// is invalid (i.e. it is equivalent to a dangling pointer). + Inactive, + }; + + RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker); + + /// Decrements the number of active disks for this reader. If the disk count + /// goes to 0, the disk complete condition variable is signaled. + /// Reader lock must be taken before this call. + void DecrementDiskRefCount() { + // boost doesn't let us dcheck that the reader lock is taken + DCHECK_GT(num_disks_with_ranges_, 0); + if (--num_disks_with_ranges_ == 0) { + disks_complete_cond_var_.NotifyAll(); + } + DCHECK(Validate()) << std::endl << DebugString(); + } + + /// Reader & Disk Scheduling: Readers that currently can't do work are not on + /// the disk's queue. These readers are ones that don't have any ranges in the + /// in_flight_queue AND have not prepared a range by setting next_range_to_start. + /// The rule to make sure readers are scheduled correctly is to ensure anytime a + /// range is put on the in_flight_queue or anytime next_range_to_start is set to + /// NULL, the reader is scheduled. + + /// Adds range to in_flight_ranges, scheduling this reader on the disk threads + /// if necessary. + /// Reader lock must be taken before this. + void ScheduleScanRange(ScanRange* range) { + DCHECK_EQ(state_, Active); + DCHECK(range != NULL); + RequestContext::PerDiskState& state = disk_states_[range->disk_id()]; + state.in_flight_ranges()->Enqueue(range); + state.ScheduleContext(this, range->disk_id()); + } + + /// Cancels the context with status code 'status' + void Cancel(const Status& status); + + /// Cancel the context if not already cancelled, wait for all scan ranges to finish + /// and mark the context as inactive, after which it cannot be used. + void CancelAndMarkInactive(); + + /// Adds request range to disk queue for this request context. Currently, + /// schedule_immediately must be false is RequestRange is a write range. + void AddRequestRange(RequestRange* range, bool schedule_immediately); + + /// Validates invariants of reader. Reader lock must be taken beforehand. + bool Validate() const; + + /// Dumps out reader information. Lock should be taken by caller + std::string DebugString() const; + + /// Parent object + DiskIoMgr* const parent_; + + /// Memory used for this reader. This is unowned by this object. + MemTracker* const mem_tracker_; + + /// Total bytes read for this reader + RuntimeProfile::Counter* bytes_read_counter_ = nullptr; + + /// Total time spent in hdfs reading + RuntimeProfile::Counter* read_timer_ = nullptr; + + /// Number of active read threads + RuntimeProfile::Counter* active_read_thread_counter_ = nullptr; + + /// Disk access bitmap. The counter's bit[i] is set if disk id i has been accessed. + /// TODO: we can only support up to 64 disks with this bitmap but it lets us use a + /// builtin atomic instruction. Probably good enough for now. + RuntimeProfile::Counter* disks_accessed_bitmap_ = nullptr; + + /// Total number of bytes read locally, updated at end of each range scan + AtomicInt64 bytes_read_local_{0}; + + /// Total number of bytes read via short circuit read, updated at end of each range scan + AtomicInt64 bytes_read_short_circuit_{0}; + + /// Total number of bytes read from date node cache, updated at end of each range scan + AtomicInt64 bytes_read_dn_cache_{0}; + + /// Total number of bytes from remote reads that were expected to be local. + AtomicInt64 unexpected_remote_bytes_{0}; + + /// The number of buffers that have been returned to the reader (via GetNext) that the + /// reader has not returned. Only included for debugging and diagnostics. + AtomicInt32 num_buffers_in_reader_{0}; + + /// The number of scan ranges that have been completed for this reader. + AtomicInt32 num_finished_ranges_{0}; + + /// The number of scan ranges that required a remote read, updated at the end of each + /// range scan. Only used for diagnostics. + AtomicInt32 num_remote_ranges_{0}; + + /// The total number of scan ranges that have not been started. Only used for + /// diagnostics. This is the sum of all unstarted_scan_ranges across all disks. + AtomicInt32 num_unstarted_scan_ranges_{0}; + + /// Total number of file handle opens where the file handle was present in the cache + AtomicInt32 cached_file_handles_hit_count_{0}; + + /// Total number of file handle opens where the file handle was not in the cache + AtomicInt32 cached_file_handles_miss_count_{0}; + + /// The number of buffers that are being used for this reader. This is the sum + /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about + /// to be queued). This includes both IOMgr-allocated buffers and client-provided + /// buffers. + AtomicInt32 num_used_buffers_{0}; + + /// The total number of ready buffers across all ranges. Ready buffers are buffers + /// that have been read from disk but not retrieved by the caller. + /// This is the sum of all queued buffers in all ranges for this reader context. + AtomicInt32 num_ready_buffers_{0}; + + /// All fields below are accessed by multiple threads and the lock needs to be + /// taken before accessing them. Must be acquired before ScanRange::lock_ if both + /// are held simultaneously. + boost::mutex lock_; + + /// Current state of the reader + State state_ = Active; + + /// Status of this reader. Set to non-ok if cancelled. + Status status_; + + /// The number of disks with scan ranges remaining (always equal to the sum of + /// disks with ranges). + int num_disks_with_ranges_ = 0; + + /// This is the list of ranges that are expected to be cached on the DN. + /// When the reader asks for a new range (GetNextScanRange()), we first + /// return ranges from this list. + InternalQueue<ScanRange> cached_ranges_; + + /// A list of ranges that should be returned in subsequent calls to + /// GetNextRange. + /// There is a trade-off with when to populate this list. Populating it on + /// demand means consumers need to wait (happens in DiskIoMgr::GetNextRange()). + /// Populating it preemptively means we make worse scheduling decisions. + /// We currently populate one range per disk. + /// TODO: think about this some more. + InternalQueue<ScanRange> ready_to_start_ranges_; + ConditionVariable ready_to_start_ranges_cv_; // used with lock_ + + /// Ranges that are blocked due to back pressure on outgoing buffers. + InternalQueue<ScanRange> blocked_ranges_; + + /// Condition variable for UnregisterContext() to wait for all disks to complete + ConditionVariable disks_complete_cond_var_; + + /// Struct containing state per disk. See comments in the disk read loop on how + /// they are used. + class PerDiskState { + public: + bool done() const { return done_; } + void set_done(bool b) { done_ = b; } + + int num_remaining_ranges() const { return num_remaining_ranges_; } + int& num_remaining_ranges() { return num_remaining_ranges_; } + + ScanRange* next_scan_range_to_start() { return next_scan_range_to_start_; } + void set_next_scan_range_to_start(ScanRange* range) { + next_scan_range_to_start_ = range; + } + + /// We need to have a memory barrier to prevent this load from being reordered + /// with num_threads_in_op(), since these variables are set without the reader + /// lock taken + bool is_on_queue() const { + bool b = is_on_queue_; + __sync_synchronize(); + return b; + } + + int num_threads_in_op() const { + int v = num_threads_in_op_.Load(); + // TODO: determine whether this barrier is necessary for any callsites. + AtomicUtil::MemoryBarrier(); + return v; + } + + const InternalQueue<ScanRange>* unstarted_scan_ranges() const { + return &unstarted_scan_ranges_; + } + const InternalQueue<WriteRange>* unstarted_write_ranges() const { + return &unstarted_write_ranges_; + } + const InternalQueue<RequestRange>* in_flight_ranges() const { + return &in_flight_ranges_; + } + + InternalQueue<ScanRange>* unstarted_scan_ranges() { return &unstarted_scan_ranges_; } + InternalQueue<WriteRange>* unstarted_write_ranges() { + return &unstarted_write_ranges_; + } + InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; } + + /// Schedules the request context on this disk if it's not already on the queue. + /// Context lock must be taken before this. + void ScheduleContext(RequestContext* context, int disk_id); + + /// Increment the ref count on reader. We need to track the number of threads per + /// reader per disk that are in the unlocked hdfs read code section. This is updated + /// by multiple threads without a lock so we need to use an atomic int. + void IncrementRequestThreadAndDequeue() { + num_threads_in_op_.Add(1); + is_on_queue_ = false; + } + + void DecrementRequestThread() { num_threads_in_op_.Add(-1); } + + /// Decrement request thread count and do final cleanup if this is the last + /// thread. RequestContext lock must be taken before this. + void DecrementRequestThreadAndCheckDone(RequestContext* context) { + num_threads_in_op_.Add(-1); // Also acts as a barrier. + if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) { + // This thread is the last one for this reader on this disk, do final cleanup + context->DecrementDiskRefCount(); + done_ = true; + } + } + + private: + /// If true, this disk is all done for this request context, including any cleanup. + /// If done is true, it means that this request must not be on this disk's queue + /// *AND* there are no threads currently working on this context. To satisfy + /// this, only the last thread (per disk) can set this to true. + bool done_ = true; + + /// For each disk, keeps track if the context is on this disk's queue, indicating + /// the disk must do some work for this context. The disk needs to do work in 4 cases: + /// 1) in_flight_ranges is not empty, the disk needs to read for this reader. + /// 2) next_range_to_start is NULL, the disk needs to prepare a scan range to be + /// read next. + /// 3) the reader has been cancelled and this disk needs to participate in the + /// cleanup. + /// 4) A write range is added to queue. + /// In general, we only want to put a context on the disk queue if there is something + /// useful that can be done. If there's nothing useful, the disk queue will wake up + /// and then remove the reader from the queue. Doing this causes thrashing of the + /// threads. + bool is_on_queue_ = false; + + /// For each disks, the number of request ranges that have not been fully read. + /// In the non-cancellation path, this will hit 0, and done will be set to true + /// by the disk thread. This is undefined in the cancellation path (the various + /// threads notice by looking at the RequestContext's state_). + int num_remaining_ranges_ = 0; + + /// Queue of ranges that have not started being read. This list is exclusive + /// with in_flight_ranges. + InternalQueue<ScanRange> unstarted_scan_ranges_; + + /// Queue of pending IO requests for this disk in the order that they will be + /// processed. A ScanRange is added to this queue when it is returned in + /// GetNextRange(), or when it is added with schedule_immediately = true. + /// A WriteRange is added to this queue from unstarted_write_ranges_ for each + /// invocation of GetNextRequestRange() in WorkLoop(). + /// The size of this queue is always less than or equal to num_remaining_ranges. + InternalQueue<RequestRange> in_flight_ranges_; + + /// The next range to start for this reader on this disk. Each disk (for each reader) + /// picks the next range to start. The range is set here and also added to the + /// ready_to_start_ranges_ queue. The reader pulls from the queue in FIFO order, + /// so the ranges from different disks are round-robined. When the range is pulled + /// off the ready_to_start_ranges_ queue, it sets this variable to NULL, so the disk + /// knows to populate it again and add it to ready_to_start_ranges_ i.e. it is used + /// as a flag by DiskIoMgr::GetNextScanRange to determine if it needs to add another + /// range to ready_to_start_ranges_. + ScanRange* next_scan_range_to_start_ = nullptr; + + /// For each disk, the number of threads issuing the underlying read/write on behalf + /// of this context. There are a few places where we release the context lock, do some + /// work, and then grab the lock again. Because we don't hold the lock for the + /// entire operation, we need this ref count to keep track of which thread should do + /// final resource cleanup during cancellation. + /// Only the thread that sees the count at 0 should do the final cleanup. + AtomicInt32 num_threads_in_op_{0}; + + /// Queue of write ranges to process for this disk. A write range is always added + /// to in_flight_ranges_ in GetNextRequestRange(). There is a separate + /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate between reads + /// and writes. (Otherwise, since next_scan_range_to_start is set + /// in GetNextRequestRange() whenever it is null, repeated calls to + /// GetNextRequestRange() and GetNextRange() may result in only reads being processed) + InternalQueue<WriteRange> unstarted_write_ranges_; + }; + + /// Per disk states to synchronize multiple disk threads accessing the same request + /// context. + std::vector<PerDiskState> disk_states_; +}; +} +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/request-ranges.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h new file mode 100644 index 0000000..c1b3bbe --- /dev/null +++ b/be/src/runtime/io/request-ranges.h @@ -0,0 +1,471 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_IO_REQUEST_RANGES_H +#define IMPALA_RUNTIME_IO_REQUEST_RANGES_H + +#include <cstdint> +#include <deque> + +#include <boost/thread/mutex.hpp> + +#include "common/hdfs.h" +#include "common/status.h" +#include "util/condition-variable.h" +#include "util/internal-queue.h" + +namespace impala { +class MemTracker; + +namespace io { +class DiskIoMgr; +class RequestContext; +class HdfsFileHandle; +class ScanRange; + +/// Buffer struct that is used by the caller and IoMgr to pass read buffers. +/// It is is expected that only one thread has ownership of this object at a +/// time. +class BufferDescriptor { + public: + ~BufferDescriptor() { + DCHECK(buffer_ == nullptr); // Check we didn't leak a buffer. + } + + ScanRange* scan_range() { return scan_range_; } + uint8_t* buffer() { return buffer_; } + int64_t buffer_len() { return buffer_len_; } + int64_t len() { return len_; } + bool eosr() { return eosr_; } + + /// Returns the offset within the scan range that this buffer starts at + int64_t scan_range_offset() const { return scan_range_offset_; } + + /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and set + /// 'mem_tracker_' to 'dst'. 'mem_tracker_' and 'dst' must be non-NULL. Does not + /// check memory limits on 'dst': the caller should check the memory limit if a + /// different memory limit may apply to 'dst'. If the buffer was a client-provided + /// buffer, transferring is not allowed. + /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp. + void TransferOwnership(MemTracker* dst); + + private: + friend class DiskIoMgr; + friend class ScanRange; + friend class RequestContext; + + /// Create a buffer descriptor for a new reader, range and data buffer. The buffer + /// memory should already be accounted against 'mem_tracker'. + BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader, + ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len, + MemTracker* mem_tracker); + + /// Return true if this is a cached buffer owned by HDFS. + bool is_cached() const; + + /// Return true if this is a buffer owner by the client that was provided when + /// constructing the scan range. + bool is_client_buffer() const; + + DiskIoMgr* const io_mgr_; + + /// Reader that this buffer is for. + RequestContext* const reader_; + + /// The current tracker this buffer is associated with. After initialisation, + /// NULL for cached buffers and non-NULL for all other buffers. + MemTracker* mem_tracker_; + + /// Scan range that this buffer is for. Non-NULL when initialised. + ScanRange* const scan_range_; + + /// buffer with the read contents + uint8_t* buffer_; + + /// length of buffer_. For buffers from cached reads, the length is 0. + const int64_t buffer_len_; + + /// length of read contents + int64_t len_ = 0; + + /// true if the current scan range is complete + bool eosr_ = false; + + /// Status of the read to this buffer. if status is not ok, 'buffer' is nullptr + Status status_; + + int64_t scan_range_offset_ = 0; +}; + +/// The request type, read or write associated with a request range. +struct RequestType { + enum type { + READ, + WRITE, + }; +}; + +/// Represents a contiguous sequence of bytes in a single file. +/// This is the common base class for read and write IO requests - ScanRange and +/// WriteRange. Each disk thread processes exactly one RequestRange at a time. +class RequestRange : public InternalQueue<RequestRange>::Node { + public: + hdfsFS fs() const { return fs_; } + const char* file() const { return file_.c_str(); } + std::string* file_string() { return &file_; } + int64_t offset() const { return offset_; } + int64_t len() const { return len_; } + int disk_id() const { return disk_id_; } + RequestType::type request_type() const { return request_type_; } + + protected: + RequestRange(RequestType::type request_type) + : fs_(nullptr), offset_(-1), len_(-1), disk_id_(-1), request_type_(request_type) {} + + /// Hadoop filesystem that contains file_, or set to nullptr for local filesystem. + hdfsFS fs_; + + /// Path to file being read or written. + std::string file_; + + /// Offset within file_ being read or written. + int64_t offset_; + + /// Length of data read or written. + int64_t len_; + + /// Id of disk containing byte range. + int disk_id_; + + /// The type of IO request, READ or WRITE. + RequestType::type request_type_; +}; + +/// Param struct for different combinations of buffering. +struct BufferOpts { + public: + // Set options for a read into an IoMgr-allocated or HDFS-cached buffer. Caching is + // enabled if 'try_cache' is true, the file is in the HDFS cache and 'mtime' matches + // the modified time of the cached file in the HDFS cache. + BufferOpts(bool try_cache, int64_t mtime) + : try_cache_(try_cache), + mtime_(mtime), + client_buffer_(nullptr), + client_buffer_len_(-1) {} + + /// Set options for an uncached read into an IoMgr-allocated buffer. + static BufferOpts Uncached() { + return BufferOpts(false, NEVER_CACHE, nullptr, -1); + } + + /// Set options to read the entire scan range into 'client_buffer'. The length of the + /// buffer, 'client_buffer_len', must fit the entire scan range. HDFS caching is not + /// enabled in this case. + static BufferOpts ReadInto(uint8_t* client_buffer, int64_t client_buffer_len) { + return BufferOpts(false, NEVER_CACHE, client_buffer, client_buffer_len); + } + + private: + friend class ScanRange; + + BufferOpts( + bool try_cache, int64_t mtime, uint8_t* client_buffer, int64_t client_buffer_len) + : try_cache_(try_cache), + mtime_(mtime), + client_buffer_(client_buffer), + client_buffer_len_(client_buffer_len) {} + + /// If 'mtime_' is set to NEVER_CACHE, the file handle will never be cached, because + /// the modification time won't match. + const static int64_t NEVER_CACHE = -1; + + /// If true, read from HDFS cache if possible. + const bool try_cache_; + + /// Last modified time of the file associated with the scan range. If set to + /// NEVER_CACHE, caching is disabled. + const int64_t mtime_; + + /// A destination buffer provided by the client, nullptr and -1 if no buffer. + uint8_t* const client_buffer_; + const int64_t client_buffer_len_; +}; + +/// ScanRange description. The caller must call Reset() to initialize the fields +/// before calling AddScanRanges(). The private fields are used internally by +/// the IoMgr. +class ScanRange : public RequestRange { + public: + ScanRange(); + + virtual ~ScanRange(); + + /// Resets this scan range object with the scan range description. The scan range + /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the + /// local filesystem). The scan range must fall within the file bounds (offset >= 0 + /// and offset + len <= file_length). 'disk_id' is the disk queue to add the range + /// to. If 'expected_local' is true, a warning is generated if the read did not + /// come from a local disk. 'buffer_opts' specifies buffer management options - + /// see the DiskIoMgr class comment and the BufferOpts comments for details. + /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data. + void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, + bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr); + + void* meta_data() const { return meta_data_; } + bool try_cache() const { return try_cache_; } + bool expected_local() const { return expected_local_; } + + /// Returns the next buffer for this scan range. buffer is an output parameter. + /// This function blocks until a buffer is ready or an error occurred. If this is + /// called when all buffers have been returned, *buffer is set to nullptr and Status::OK + /// is returned. + /// Only one thread can be in GetNext() at any time. + Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; + + /// Cancel this scan range. This cleans up all queued buffers and + /// wakes up any threads blocked on GetNext(). + /// Status is the reason the range was cancelled. Must not be ok(). + /// Status is returned to the user in GetNext(). + void Cancel(const Status& status); + + /// return a descriptive string for debug. + std::string DebugString() const; + + int64_t mtime() const { return mtime_; } + + private: + friend class BufferDescriptor; + friend class DiskIoMgr; + friend class RequestContext; + + /// Initialize internal fields + void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader); + + /// Enqueues a buffer for this range. This does not block. + /// Returns true if this scan range has hit the queue capacity, false otherwise. + /// The caller passes ownership of buffer to the scan range and it is not + /// valid to access buffer after this call. The reader lock must be held by the + /// caller. + bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock, + std::unique_ptr<BufferDescriptor> buffer); + + /// Cleanup any queued buffers (i.e. due to cancellation). This cannot + /// be called with any locks taken. + void CleanupQueuedBuffers(); + + /// Validates the internal state of this range. lock_ must be taken + /// before calling this. + bool Validate(); + + /// Maximum length in bytes for hdfsRead() calls. + int64_t MaxReadChunkSize() const; + + /// Opens the file for this range. This function only modifies state in this range. + /// If 'use_file_handle_cache' is true and this is a local hdfs file, then this scan + /// range will not maintain an exclusive file handle. It will borrow an hdfs file + /// handle from the file handle cache for each Read(), so Open() does nothing. + /// If 'use_file_handle_cache' is false or this is a remote hdfs file or this is + /// a local OS file, Open() will maintain a file handle on the scan range for + /// exclusive use by this scan range. An exclusive hdfs file handle still comes + /// from the cache, but it is a newly opened file handle that is held for the + /// entire duration of a scan range's lifetime and destroyed in Close(). + /// All local OS files are opened using normal OS file APIs. + Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT; + + /// Closes the file for this range. This function only modifies state in this range. + void Close(); + + /// Reads from this range into 'buffer', which has length 'buffer_len' bytes. Returns + /// the number of bytes read. The read position in this scan range is updated. + Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, + bool* eosr) WARN_UNUSED_RESULT; + + /// Get the read statistics from the Hdfs file handle and aggregate them to + /// the RequestContext. This clears the statistics on this file handle. + /// It is safe to pass hdfsFile by value, as hdfsFile's underlying type is a + /// pointer. + void GetHdfsStatistics(hdfsFile fh); + + /// Reads from the DN cache. On success, sets cached_buffer_ to the DN buffer + /// and *read_succeeded to true. + /// If the data is not cached, returns ok() and *read_succeeded is set to false. + /// Returns a non-ok status if it ran into a non-continuable error. + /// The reader lock must be held by the caller. + Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock, + bool* read_succeeded) WARN_UNUSED_RESULT; + + /// Pointer to caller specified metadata. This is untouched by the io manager + /// and the caller can put whatever auxiliary data in here. + void* meta_data_ = nullptr; + + /// If true, this scan range is expected to be cached. Note that this might be wrong + /// since the block could have been uncached. In that case, the cached path + /// will fail and we'll just put the scan range on the normal read path. + bool try_cache_ = false; + + /// If true, we expect this scan range to be a local read. Note that if this is false, + /// it does not necessarily mean we expect the read to be remote, and that we never + /// create scan ranges where some of the range is expected to be remote and some of it + /// local. + /// TODO: we can do more with this + bool expected_local_ = false; + + /// Total number of bytes read remotely. This is necessary to maintain a count of + /// the number of remote scan ranges. Since IO statistics can be collected multiple + /// times for a scan range, it is necessary to keep some state about whether this + /// scan range has already been counted as remote. There is also a requirement to + /// log the number of unexpected remote bytes for a scan range. To solve both + /// requirements, maintain num_remote_bytes_ on the ScanRange and push it to the + /// reader_ once at the close of the scan range. + int64_t num_remote_bytes_; + + DiskIoMgr* io_mgr_ = nullptr; + + /// Reader/owner of the scan range + RequestContext* reader_ = nullptr; + + /// File handle either to hdfs or local fs (FILE*) + /// The hdfs file handle is only stored here in three cases: + /// 1. The file handle cache is off (max_cached_file_handles == 0). + /// 2. The scan range is using hdfs caching. + /// -OR- + /// 3. The hdfs file is expected to be remote (expected_local_ == false) + /// In each case, the scan range gets a new file handle from the file handle cache + /// at Open(), holds it exclusively, and destroys it in Close(). + union { + FILE* local_file_ = nullptr; + HdfsFileHandle* exclusive_hdfs_fh_; + }; + + /// Tagged union that holds a buffer for the cases when there is a buffer allocated + /// externally from DiskIoMgr that is associated with the scan range. + enum class ExternalBufferTag { CLIENT_BUFFER, CACHED_BUFFER, NO_BUFFER }; + ExternalBufferTag external_buffer_tag_; + union { + /// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER. + struct { + /// Client-provided buffer to read the whole scan range into. + uint8_t* data; + + /// Length of the client-provided buffer. + int64_t len; + } client_buffer_; + + /// Valid and non-NULL if the external_buffer_tag_ is CACHED_BUFFER, which means + /// that a cached read succeeded and all the bytes for the range are in this buffer. + struct hadoopRzBuffer* cached_buffer_ = nullptr; + }; + + /// Lock protecting fields below. + /// This lock should not be taken during Open()/Read()/Close(). + /// If RequestContext::lock_ and this lock need to be held simultaneously, + /// RequestContext::lock_ must be taken first. + boost::mutex lock_; + + /// Number of bytes read so far for this scan range + int bytes_read_; + + /// Status for this range. This is non-ok if is_cancelled_ is true. + /// Note: an individual range can fail without the RequestContext being + /// cancelled. This allows us to skip individual ranges. + Status status_; + + /// If true, the last buffer for this scan range has been queued. + bool eosr_queued_ = false; + + /// If true, the last buffer for this scan range has been returned. + bool eosr_returned_ = false; + + /// If true, this scan range has been removed from the reader's in_flight_ranges + /// queue because the ready_buffers_ queue is full. + bool blocked_on_queue_ = false; + + /// IO buffers that are queued for this scan range. + /// Condition variable for GetNext + ConditionVariable buffer_ready_cv_; + std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_; + + /// Lock that should be taken during hdfs calls. Only one thread (the disk reading + /// thread) calls into hdfs at a time so this lock does not have performance impact. + /// This lock only serves to coordinate cleanup. Specifically it serves to ensure + /// that the disk threads are finished with HDFS calls before is_cancelled_ is set + /// to true and cleanup starts. + /// If this lock and lock_ need to be taken, lock_ must be taken first. + boost::mutex hdfs_lock_; + + /// If true, this scan range has been cancelled. + bool is_cancelled_ = false; + + /// Last modified time of the file associated with the scan range + int64_t mtime_; +}; + +/// Used to specify data to be written to a file and offset. +/// It is the responsibility of the client to ensure that the data to be written is +/// valid and that the file to be written to exists until the callback is invoked. +/// A callback is invoked to inform the client when the write is done. +class WriteRange : public RequestRange { + public: + /// This callback is invoked on each WriteRange after the write is complete or the + /// context is cancelled. The status returned by the callback parameter indicates + /// if the write was successful (i.e. Status::OK), if there was an error + /// TStatusCode::RUNTIME_ERROR) or if the context was cancelled + /// (TStatusCode::CANCELLED). The callback is only invoked if this WriteRange was + /// successfully added (i.e. AddWriteRange() succeeded). No locks are held while + /// the callback is invoked. + typedef std::function<void(const Status&)> WriteDoneCallback; + WriteRange(const std::string& file, int64_t file_offset, int disk_id, + WriteDoneCallback callback); + + /// Change the file and offset of this write range. Data and callbacks are unchanged. + /// Can only be called when the write is not in flight (i.e. before AddWriteRange() + /// is called or after the write callback was called). + void SetRange(const std::string& file, int64_t file_offset, int disk_id); + + /// Set the data and number of bytes to be written for this WriteRange. + /// Can only be called when the write is not in flight (i.e. before AddWriteRange() + /// is called or after the write callback was called). + void SetData(const uint8_t* buffer, int64_t len); + + const uint8_t* data() const { return data_; } + + private: + friend class DiskIoMgr; + friend class RequestContext; + friend class ScanRange; + + /// Data to be written. RequestRange::len_ contains the length of data + /// to be written. + const uint8_t* data_; + + /// Callback to invoke after the write is complete. + WriteDoneCallback callback_; +}; + +inline bool BufferDescriptor::is_cached() const { + return scan_range_->external_buffer_tag_ + == ScanRange::ExternalBufferTag::CACHED_BUFFER; +} + +inline bool BufferDescriptor::is_client_buffer() const { + return scan_range_->external_buffer_tag_ + == ScanRange::ExternalBufferTag::CLIENT_BUFFER; +} +} +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc new file mode 100644 index 0000000..b7655a8 --- /dev/null +++ b/be/src/runtime/io/scan-range.cc @@ -0,0 +1,593 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/io/disk-io-mgr.h" +#include "runtime/io/disk-io-mgr-internal.h" +#include "util/error-util.h" +#include "util/hdfs-util.h" + +#include "common/names.h" + +using namespace impala; +using namespace impala::io; + +DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRead() " + "when performing HDFS read operations. This is necessary to use HDFS hedged reads " + "(assuming the HDFS client is configured to do so)."); + +// TODO: Run perf tests and empirically settle on the most optimal default value for the +// read buffer size. Currently setting it as 128k for the same reason as for S3, i.e. +// due to JNI array allocation and memcpy overhead, 128k was emperically found to have the +// least overhead. +DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when " + "reading from ADLS."); + +// Implementation of the ScanRange functionality. Each ScanRange contains a queue +// of ready buffers. For each ScanRange, there is only a single producer and +// consumer thread, i.e. only one disk thread will push to a scan range at +// any time and only one thread will remove from the queue. This is to guarantee +// that buffers are queued and read in file order. + +bool ScanRange::EnqueueBuffer( + const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) { + DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); + { + unique_lock<mutex> scan_range_lock(lock_); + DCHECK(Validate()) << DebugString(); + DCHECK(!eosr_returned_); + DCHECK(!eosr_queued_); + if (is_cancelled_) { + // Return the buffer, this range has been cancelled + if (buffer->buffer_ != nullptr) { + io_mgr_->num_buffers_in_readers_.Add(1); + reader_->num_buffers_in_reader_.Add(1); + } + reader_->num_used_buffers_.Add(-1); + io_mgr_->ReturnBuffer(move(buffer)); + return false; + } + reader_->num_ready_buffers_.Add(1); + eosr_queued_ = buffer->eosr(); + ready_buffers_.emplace_back(move(buffer)); + + DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT); + blocked_on_queue_ = ready_buffers_.size() == DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT; + } + + buffer_ready_cv_.NotifyOne(); + + return blocked_on_queue_; +} + +Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { + DCHECK(*buffer == nullptr); + bool eosr; + { + unique_lock<mutex> scan_range_lock(lock_); + if (eosr_returned_) return Status::OK(); + DCHECK(Validate()) << DebugString(); + + while (ready_buffers_.empty() && !is_cancelled_) { + buffer_ready_cv_.Wait(scan_range_lock); + } + + if (is_cancelled_) { + DCHECK(!status_.ok()); + return status_; + } + + // Remove the first ready buffer from the queue and return it + DCHECK(!ready_buffers_.empty()); + DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT); + *buffer = move(ready_buffers_.front()); + ready_buffers_.pop_front(); + eosr_returned_ = (*buffer)->eosr(); + eosr = (*buffer)->eosr(); + } + + // Update tracking counters. The buffer has now moved from the IoMgr to the + // caller. + io_mgr_->num_buffers_in_readers_.Add(1); + reader_->num_buffers_in_reader_.Add(1); + reader_->num_ready_buffers_.Add(-1); + reader_->num_used_buffers_.Add(-1); + if (eosr) reader_->num_finished_ranges_.Add(1); + + Status status = (*buffer)->status_; + if (!status.ok()) { + io_mgr_->ReturnBuffer(move(*buffer)); + return status; + } + + unique_lock<mutex> reader_lock(reader_->lock_); + + DCHECK(reader_->Validate()) << endl << reader_->DebugString(); + if (reader_->state_ == RequestContext::Cancelled) { + reader_->blocked_ranges_.Remove(this); + Cancel(reader_->status_); + io_mgr_->ReturnBuffer(move(*buffer)); + return status_; + } + + { + // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer() + // may have been called after we released 'lock_' above so we need to re-check + // whether the queue is full. + unique_lock<mutex> scan_range_lock(lock_); + if (blocked_on_queue_ + && ready_buffers_.size() < DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT + && !eosr_queued_) { + blocked_on_queue_ = false; + // This scan range was blocked and is no longer, add it to the reader + // queue again. + reader_->blocked_ranges_.Remove(this); + reader_->ScheduleScanRange(this); + } + } + return Status::OK(); +} + +void ScanRange::Cancel(const Status& status) { + // Cancelling a range that was never started, ignore. + if (io_mgr_ == nullptr) return; + + DCHECK(!status.ok()); + { + // Grab both locks to make sure that all working threads see is_cancelled_. + unique_lock<mutex> scan_range_lock(lock_); + unique_lock<mutex> hdfs_lock(hdfs_lock_); + DCHECK(Validate()) << DebugString(); + if (is_cancelled_) return; + is_cancelled_ = true; + status_ = status; + } + buffer_ready_cv_.NotifyAll(); + CleanupQueuedBuffers(); + + // For cached buffers, we can't close the range until the cached buffer is returned. + // Close() is called from DiskIoMgr::ReturnBuffer(). + if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close(); +} + +void ScanRange::CleanupQueuedBuffers() { + DCHECK(is_cancelled_); + io_mgr_->num_buffers_in_readers_.Add(ready_buffers_.size()); + reader_->num_buffers_in_reader_.Add(ready_buffers_.size()); + reader_->num_used_buffers_.Add(-ready_buffers_.size()); + reader_->num_ready_buffers_.Add(-ready_buffers_.size()); + + while (!ready_buffers_.empty()) { + io_mgr_->ReturnBuffer(move(ready_buffers_.front())); + ready_buffers_.pop_front(); + } +} + +string ScanRange::DebugString() const { + stringstream ss; + ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_ + << " len=" << len_ << " bytes_read=" << bytes_read_ + << " buffer_queue=" << ready_buffers_.size() + << " hdfs_file=" << exclusive_hdfs_fh_; + return ss.str(); +} + +bool ScanRange::Validate() { + if (bytes_read_ > len_) { + LOG(WARNING) << "Bytes read tracking is wrong. Shouldn't read past the scan range." + << " bytes_read_=" << bytes_read_ << " len_=" << len_; + return false; + } + if (eosr_returned_ && !eosr_queued_) { + LOG(WARNING) << "Returned eosr to reader before finishing reading the scan range" + << " eosr_returned_=" << eosr_returned_ + << " eosr_queued_=" << eosr_queued_; + return false; + } + return true; +} + +ScanRange::ScanRange() + : RequestRange(RequestType::READ), + num_remote_bytes_(0), + external_buffer_tag_(ExternalBufferTag::NO_BUFFER), + mtime_(-1) {} + +ScanRange::~ScanRange() { + DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed."; + DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) + << "Cached buffer was not released."; +} + +void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, + int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) { + DCHECK(ready_buffers_.empty()); + DCHECK(file != nullptr); + DCHECK_GE(len, 0); + DCHECK_GE(offset, 0); + DCHECK(buffer_opts.client_buffer_ == nullptr || + buffer_opts.client_buffer_len_ >= len_); + fs_ = fs; + file_ = file; + len_ = len; + offset_ = offset; + disk_id_ = disk_id; + try_cache_ = buffer_opts.try_cache_; + mtime_ = buffer_opts.mtime_; + expected_local_ = expected_local; + num_remote_bytes_ = 0; + meta_data_ = meta_data; + if (buffer_opts.client_buffer_ != nullptr) { + external_buffer_tag_ = ExternalBufferTag::CLIENT_BUFFER; + client_buffer_.data = buffer_opts.client_buffer_; + client_buffer_.len = buffer_opts.client_buffer_len_; + } else { + external_buffer_tag_ = ExternalBufferTag::NO_BUFFER; + } + io_mgr_ = nullptr; + reader_ = nullptr; + exclusive_hdfs_fh_ = nullptr; +} + +void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) { + DCHECK(exclusive_hdfs_fh_ == nullptr); + DCHECK(local_file_ == nullptr); + // Reader must provide MemTracker or a buffer. + DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER + || reader->mem_tracker_ != nullptr); + io_mgr_ = io_mgr; + reader_ = reader; + local_file_ = nullptr; + exclusive_hdfs_fh_ = nullptr; + bytes_read_ = 0; + is_cancelled_ = false; + eosr_queued_= false; + eosr_returned_= false; + blocked_on_queue_ = false; + DCHECK(Validate()) << DebugString(); +} + +Status ScanRange::Open(bool use_file_handle_cache) { + unique_lock<mutex> hdfs_lock(hdfs_lock_); + if (is_cancelled_) return Status::CANCELLED; + + if (fs_ != nullptr) { + if (exclusive_hdfs_fh_ != nullptr) return Status::OK(); + // With file handle caching, the scan range does not maintain its own + // hdfs file handle. File handle caching is only used for local files, + // so s3 and remote filesystems should obtain an exclusive file handle + // for each scan range. + if (use_file_handle_cache && expected_local_) return Status::OK(); + // Get a new exclusive file handle. + exclusive_hdfs_fh_ = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(), + mtime(), reader_, true); + if (exclusive_hdfs_fh_ == nullptr) { + return Status(TErrorCode::DISK_IO_ERROR, + GetHdfsErrorMsg("Failed to open HDFS file ", file_)); + } + + if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) != 0) { + // Destroy the file handle and remove it from the cache. + io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true); + exclusive_hdfs_fh_ = nullptr; + return Status(TErrorCode::DISK_IO_ERROR, + Substitute("Error seeking to $0 in file: $1 $2", offset_, file_, + GetHdfsErrorMsg(""))); + } + } else { + if (local_file_ != nullptr) return Status::OK(); + + local_file_ = fopen(file(), "r"); + if (local_file_ == nullptr) { + return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not open file: $0: $1", + file_, GetStrErrMsg())); + } + if (fseek(local_file_, offset_, SEEK_SET) == -1) { + fclose(local_file_); + local_file_ = nullptr; + return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not seek to $0 " + "for file: $1: $2", offset_, file_, GetStrErrMsg())); + } + } + if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) { + ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L); + } + return Status::OK(); +} + +void ScanRange::Close() { + unique_lock<mutex> hdfs_lock(hdfs_lock_); + bool closed_file = false; + if (fs_ != nullptr) { + if (exclusive_hdfs_fh_ != nullptr) { + GetHdfsStatistics(exclusive_hdfs_fh_->file()); + + if (external_buffer_tag_ == ExternalBufferTag::CACHED_BUFFER) { + hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_); + cached_buffer_ = nullptr; + external_buffer_tag_ = ExternalBufferTag::NO_BUFFER; + } + + // Destroy the file handle and remove it from the cache. + io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true); + exclusive_hdfs_fh_ = nullptr; + closed_file = true; + } + + if (FLAGS_use_hdfs_pread) { + // Update Hedged Read Metrics. + // We call it only if the --use_hdfs_pread flag is set, to avoid having the + // libhdfs client malloc and free a hdfsHedgedReadMetrics object unnecessarily + // otherwise. 'hedged_metrics' is only set upon success. + struct hdfsHedgedReadMetrics* hedged_metrics; + int success = hdfsGetHedgedReadMetrics(fs_, &hedged_metrics); + if (success == 0) { + ImpaladMetrics::HEDGED_READ_OPS->set_value(hedged_metrics->hedgedReadOps); + ImpaladMetrics::HEDGED_READ_OPS_WIN->set_value(hedged_metrics->hedgedReadOpsWin); + hdfsFreeHedgedReadMetrics(hedged_metrics); + } + } + + if (num_remote_bytes_ > 0) { + reader_->num_remote_ranges_.Add(1L); + if (expected_local_) { + reader_->unexpected_remote_bytes_.Add(num_remote_bytes_); + VLOG_FILE << "Unexpected remote HDFS read of " + << PrettyPrinter::Print(num_remote_bytes_, TUnit::BYTES) + << " for file '" << file_ << "'"; + } + } + } else { + if (local_file_ == nullptr) return; + fclose(local_file_); + local_file_ = nullptr; + closed_file = true; + } + if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) { + ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L); + } +} + +int64_t ScanRange::MaxReadChunkSize() const { + // S3 InputStreams don't support DIRECT_READ (i.e. java.nio.ByteBuffer read() + // interface). So, hdfsRead() needs to allocate a Java byte[] and copy the data out. + // Profiles show that both the JNI array allocation and the memcpy adds much more + // overhead for larger buffers, so limit the size of each read request. 128K was + // chosen empirically by trying values between 4K and 8M and optimizing for lower CPU + // utilization and higher S3 througput. + if (disk_id_ == io_mgr_->RemoteS3DiskId()) { + DCHECK(IsS3APath(file())); + return 128 * 1024; + } + if (disk_id_ == io_mgr_->RemoteAdlsDiskId()) { + DCHECK(IsADLSPath(file())); + return FLAGS_adls_read_chunk_size; + } + // The length argument of hdfsRead() is an int. Ensure we don't overflow it. + return numeric_limits<int>::max(); +} + +// TODO: how do we best use the disk here. e.g. is it good to break up a +// 1MB read into 8 128K reads? +// TODO: look at linux disk scheduling +Status ScanRange::Read( + uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) { + unique_lock<mutex> hdfs_lock(hdfs_lock_); + if (is_cancelled_) return Status::CANCELLED; + + *eosr = false; + *bytes_read = 0; + // Read until the end of the scan range or the end of the buffer. + int bytes_to_read = min(len_ - bytes_read_, buffer_len); + DCHECK_GE(bytes_to_read, 0); + + if (fs_ != nullptr) { + HdfsFileHandle* borrowed_hdfs_fh = nullptr; + hdfsFile hdfs_file; + + // If the scan range has an exclusive file handle, use it. Otherwise, borrow + // a file handle from the cache. + if (exclusive_hdfs_fh_ != nullptr) { + hdfs_file = exclusive_hdfs_fh_->file(); + } else { + borrowed_hdfs_fh = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(), + mtime(), reader_, false); + if (borrowed_hdfs_fh == nullptr) { + return Status(TErrorCode::DISK_IO_ERROR, + GetHdfsErrorMsg("Failed to open HDFS file ", file_)); + } + hdfs_file = borrowed_hdfs_fh->file(); + } + + int64_t max_chunk_size = MaxReadChunkSize(); + Status status = Status::OK(); + while (*bytes_read < bytes_to_read) { + int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size); + DCHECK_GE(chunk_size, 0); + // The hdfsRead() length argument is an int. + DCHECK_LE(chunk_size, numeric_limits<int>::max()); + int current_bytes_read = -1; + // bytes_read_ is only updated after the while loop + int64_t position_in_file = offset_ + bytes_read_ + *bytes_read; + int num_retries = 0; + while (true) { + status = Status::OK(); + // For file handles from the cache, any of the below file operations may fail + // due to a bad file handle. In each case, record the error, but allow for a + // retry to fix it. + if (FLAGS_use_hdfs_pread) { + current_bytes_read = hdfsPread(fs_, hdfs_file, position_in_file, + buffer + *bytes_read, chunk_size); + if (current_bytes_read == -1) { + status = Status(TErrorCode::DISK_IO_ERROR, + GetHdfsErrorMsg("Error reading from HDFS file: ", file_)); + } + } else { + // If the file handle is borrowed, it may not be at the appropriate + // location. Seek to the appropriate location. + bool seek_failed = false; + if (borrowed_hdfs_fh != nullptr) { + if (hdfsSeek(fs_, hdfs_file, position_in_file) != 0) { + status = Status(TErrorCode::DISK_IO_ERROR, Substitute("Error seeking to $0 " + " in file: $1: $2", position_in_file, file_, GetHdfsErrorMsg(""))); + seek_failed = true; + } + } + if (!seek_failed) { + current_bytes_read = hdfsRead(fs_, hdfs_file, buffer + *bytes_read, + chunk_size); + if (current_bytes_read == -1) { + status = Status(TErrorCode::DISK_IO_ERROR, + GetHdfsErrorMsg("Error reading from HDFS file: ", file_)); + } + } + } + + // Do not retry: + // - if read was successful (current_bytes_read != -1) + // - or if already retried once + // - or if this not using a borrowed file handle + DCHECK_LE(num_retries, 1); + if (current_bytes_read != -1 || borrowed_hdfs_fh == nullptr || + num_retries == 1) { + break; + } + // The error may be due to a bad file handle. Reopen the file handle and retry. + ++num_retries; + RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(), + mtime(), &borrowed_hdfs_fh)); + hdfs_file = borrowed_hdfs_fh->file(); + } + if (!status.ok()) break; + if (current_bytes_read == 0) { + // No more bytes in the file. The scan range went past the end. + *eosr = true; + break; + } + *bytes_read += current_bytes_read; + + // Collect and accumulate statistics + GetHdfsStatistics(hdfs_file); + } + + if (borrowed_hdfs_fh != nullptr) { + io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh, false); + } + if (!status.ok()) return status; + } else { + DCHECK(local_file_ != nullptr); + *bytes_read = fread(buffer, 1, bytes_to_read, local_file_); + DCHECK_GE(*bytes_read, 0); + DCHECK_LE(*bytes_read, bytes_to_read); + if (*bytes_read < bytes_to_read) { + if (ferror(local_file_) != 0) { + return Status(TErrorCode::DISK_IO_ERROR, Substitute("Error reading from $0" + "at byte offset: $1: $2", file_, offset_ + bytes_read_, GetStrErrMsg())); + } else { + // On Linux, we should only get partial reads from block devices on error or eof. + DCHECK(feof(local_file_) != 0); + *eosr = true; + } + } + } + bytes_read_ += *bytes_read; + DCHECK_LE(bytes_read_, len_); + if (bytes_read_ == len_) *eosr = true; + return Status::OK(); +} + +Status ScanRange::ReadFromCache( + const unique_lock<mutex>& reader_lock, bool* read_succeeded) { + DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); + DCHECK(try_cache_); + DCHECK_EQ(bytes_read_, 0); + *read_succeeded = false; + Status status = Open(false); + if (!status.ok()) return status; + + // Cached reads not supported on local filesystem. + if (fs_ == nullptr) return Status::OK(); + + { + unique_lock<mutex> hdfs_lock(hdfs_lock_); + if (is_cancelled_) return Status::CANCELLED; + + DCHECK(exclusive_hdfs_fh_ != nullptr); + DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER); + cached_buffer_ = + hadoopReadZero(exclusive_hdfs_fh_->file(), io_mgr_->cached_read_options_, len()); + if (cached_buffer_ != nullptr) { + external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER; + } + } + // Data was not cached, caller will fall back to normal read path. + if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) { + VLOG_QUERY << "Cache read failed for scan range: " << DebugString() + << ". Switching to disk read path."; + // Clean up the scan range state before re-issuing it. + Close(); + return Status::OK(); + } + + // Cached read returned a buffer, verify we read the correct amount of data. + void* buffer = const_cast<void*>(hadoopRzBufferGet(cached_buffer_)); + int32_t bytes_read = hadoopRzBufferLength(cached_buffer_); + // A partial read can happen when files are truncated. + // TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish + // between errors and partially cached blocks here. + if (bytes_read < len()) { + VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected " + << len() << " bytes, but read " << bytes_read << ". Switching to disk read path."; + // Close the scan range. 'read_succeeded' is still false, so the caller will fall back + // to non-cached read of this scan range. + Close(); + return Status::OK(); + } + + // Create a single buffer desc for the entire scan range and enqueue that. + // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client, + // not the Impala backend. + unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor( + io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr)); + desc->len_ = bytes_read; + desc->scan_range_offset_ = 0; + desc->eosr_ = true; + bytes_read_ = bytes_read; + EnqueueBuffer(reader_lock, move(desc)); + if (reader_->bytes_read_counter_ != nullptr) { + COUNTER_ADD(reader_->bytes_read_counter_, bytes_read); + } + *read_succeeded = true; + reader_->num_used_buffers_.Add(1); + return Status::OK(); +} + +void ScanRange::GetHdfsStatistics(hdfsFile hdfs_file) { + struct hdfsReadStatistics* stats; + if (IsHdfsPath(file())) { + int success = hdfsFileGetReadStatistics(hdfs_file, &stats); + if (success == 0) { + reader_->bytes_read_local_.Add(stats->totalLocalBytesRead); + reader_->bytes_read_short_circuit_.Add(stats->totalShortCircuitBytesRead); + reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead); + if (stats->totalLocalBytesRead != stats->totalBytesRead) { + num_remote_bytes_ += stats->totalBytesRead - stats->totalLocalBytesRead; + } + hdfsFileFreeReadStatistics(stats); + } + hdfsFileClearReadStatistics(hdfs_file); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index d246024..2c08f30 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -29,7 +29,7 @@ #include "kudu/util/slice.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/descriptors.h" -#include "runtime/disk-io-mgr.h" +#include "runtime/io/disk-io-mgr.h" #include "runtime/mem-pool.h" namespace kudu { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 308b2c4..37219cc 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -260,7 +260,7 @@ CatalogServiceClientCache* RuntimeState::catalogd_client_cache() { return exec_env_->catalogd_client_cache(); } -DiskIoMgr* RuntimeState::io_mgr() { +io::DiskIoMgr* RuntimeState::io_mgr() { return exec_env_->disk_io_mgr(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 74c27e5..4eb3e10 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -35,7 +35,6 @@ namespace impala { class BufferPool; class DataStreamRecvr; class DescriptorTbl; -class DiskIoMgr; class Expr; class LlvmCodeGen; class MemTracker; @@ -53,6 +52,10 @@ class TPlanFragmentCtx; class TPlanFragmentInstanceCtx; class QueryState; +namespace io { + class DiskIoMgr; +} + /// TODO: move the typedefs into a separate .h (and fix the includes for that) /// Counts how many rows an INSERT query has added to a particular partition @@ -124,7 +127,7 @@ class RuntimeState { HBaseTableFactory* htable_factory(); ImpalaBackendClientCache* impalad_client_cache(); CatalogServiceClientCache* catalogd_client_cache(); - DiskIoMgr* io_mgr(); + io::DiskIoMgr* io_mgr(); MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); } MemTracker* query_mem_tracker(); // reference to the query_state_'s memtracker ReservationTracker* instance_buffer_reservation() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/test-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h index e721510..5fb9a1c 100644 --- a/be/src/runtime/test-env.h +++ b/be/src/runtime/test-env.h @@ -18,7 +18,7 @@ #ifndef IMPALA_RUNTIME_TEST_ENV #define IMPALA_RUNTIME_TEST_ENV -#include "runtime/disk-io-mgr.h" +#include "runtime/io/disk-io-mgr.h" #include "runtime/exec-env.h" #include "runtime/fragment-instance-state.h" #include "runtime/mem-tracker.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index dde6348..fbc0a36 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -47,6 +47,8 @@ DECLARE_int32(stress_scratch_write_delay_ms); namespace impala { +using namespace io; + class TmpFileMgrTest : public ::testing::Test { public: virtual void SetUp() { @@ -130,7 +132,7 @@ class TmpFileMgrTest : public ::testing::Test { group->next_allocation_index_ = value; } - /// Helper to cancel the FileGroup DiskIoRequestContext. + /// Helper to cancel the FileGroup RequestContext. static void CancelIoContext(TmpFileMgr::FileGroup* group) { group->io_mgr_->CancelContext(group->io_ctx_.get()); } @@ -404,7 +406,7 @@ TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) { std::iota(data[i].begin(), data[i].end(), i); } - DiskIoMgr::WriteRange::WriteDoneCallback callback = + WriteRange::WriteDoneCallback callback = bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1); vector<unique_ptr<TmpFileMgr::WriteHandle>> handles(BLOCKS); // 'file_group' should allocate extra scratch bytes for this 'alloc_size'. @@ -449,7 +451,7 @@ TEST_F(TmpFileMgrTest, TestProcessMemLimitExceeded) { CancelIoContext(&file_group); // After this error, writing via the file group should fail. - DiskIoMgr::WriteRange::WriteDoneCallback callback = + WriteRange::WriteDoneCallback callback = bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1); unique_ptr<TmpFileMgr::WriteHandle> handle; Status status = file_group.Write(MemRange(data.data(), DATA_SIZE), callback, &handle); @@ -483,7 +485,7 @@ TEST_F(TmpFileMgrTest, TestEncryptionDuringCancellation) { // Start a write in flight, which should encrypt the data and write it to disk. unique_ptr<TmpFileMgr::WriteHandle> handle; - DiskIoMgr::WriteRange::WriteDoneCallback callback = + WriteRange::WriteDoneCallback callback = bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1); ASSERT_OK(file_group.Write(data_mem_range, callback, &handle)); string file_path = handle->TmpFilePath();