http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc new file mode 100644 index 0000000..f9aed92 --- /dev/null +++ b/be/src/runtime/io/disk-io-mgr.cc @@ -0,0 +1,1191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "common/global-flags.h" +#include "runtime/io/disk-io-mgr.h" +#include "runtime/io/disk-io-mgr-internal.h" +#include "runtime/io/handle-cache.inline.h" + +#include <boost/algorithm/string.hpp> + +#include "gutil/strings/substitute.h" +#include "util/bit-util.h" +#include "util/hdfs-util.h" +#include "util/time.h" + +DECLARE_bool(disable_mem_pools); +#ifndef NDEBUG +DECLARE_int32(stress_scratch_write_delay_ms); +#endif + +#include "common/names.h" + +using namespace impala; +using namespace impala::io; +using namespace strings; + +// Control the number of disks on the machine. If 0, this comes from the system +// settings. +DEFINE_int32(num_disks, 0, "Number of disks on data node."); +// Default IoMgr configs: +// The maximum number of the threads per disk is also the max queue depth per disk. +DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk"); + +// Rotational disks should have 1 thread per disk to minimize seeks. Non-rotational +// don't have this penalty and benefit from multiple concurrent IO requests. +static const int THREADS_PER_ROTATIONAL_DISK = 1; +static const int THREADS_PER_SOLID_STATE_DISK = 8; + +// The maximum number of the threads per rotational disk is also the max queue depth per +// rotational disk. +static const string num_io_threads_per_rotational_disk_help_msg = Substitute("Number of " + "I/O threads per rotational disk. Has priority over num_threads_per_disk. If neither" + " is set, defaults to $0 thread(s) per rotational disk", THREADS_PER_ROTATIONAL_DISK); +DEFINE_int32(num_io_threads_per_rotational_disk, 0, + num_io_threads_per_rotational_disk_help_msg.c_str()); +// The maximum number of the threads per solid state disk is also the max queue depth per +// solid state disk. +static const string num_io_threads_per_solid_state_disk_help_msg = Substitute("Number of" + " I/O threads per solid state disk. Has priority over num_threads_per_disk. If " + "neither is set, defaults to $0 thread(s) per solid state disk", + THREADS_PER_SOLID_STATE_DISK); +DEFINE_int32(num_io_threads_per_solid_state_disk, 0, + num_io_threads_per_solid_state_disk_help_msg.c_str()); +// The maximum number of remote HDFS I/O threads. HDFS access that are expected to be +// remote are placed on a separate remote disk queue. This is the queue depth for that +// queue. If 0, then the remote queue is not used and instead ranges are round-robined +// across the local disk queues. +DEFINE_int32(num_remote_hdfs_io_threads, 8, "Number of remote HDFS I/O threads"); +// The maximum number of S3 I/O threads. The default value of 16 was chosen emperically +// to maximize S3 throughput. Maximum throughput is achieved with multiple connections +// open to S3 and use of multiple CPU cores since S3 reads are relatively compute +// expensive (SSL and JNI buffer overheads). +DEFINE_int32(num_s3_io_threads, 16, "Number of S3 I/O threads"); +// The maximum number of ADLS I/O threads. This number is a good default to have for +// clusters that may vary widely in size, due to an undocumented concurrency limit +// enforced by ADLS for a cluster, which spans between 500-700. For smaller clusters +// (~10 nodes), 64 threads would be more ideal. +DEFINE_int32(num_adls_io_threads, 16, "Number of ADLS I/O threads"); + +DECLARE_int64(min_buffer_size); + +// With 1024B through 8MB buffers, this is up to ~2GB of buffers. +DEFINE_int32(max_free_io_buffers, 128, + "For each io buffer size, the maximum number of buffers the IoMgr will hold onto"); + +// The number of cached file handles defines how much memory can be used per backend for +// caching frequently used file handles. Measurements indicate that a single file handle +// uses about 6kB of memory. 20k file handles will thus reserve ~120MB of memory. +// The actual amount of memory that is associated with a file handle can be larger +// or smaller, depending on the replication factor for this file or the path name. +DEFINE_uint64(max_cached_file_handles, 20000, "Maximum number of HDFS file handles " + "that will be cached. Disabled if set to 0."); + +// The unused file handle timeout specifies how long a file handle will remain in the +// cache if it is not being used. Aging out unused handles ensures that the cache is not +// wasting memory on handles that aren't useful. This allows users to specify a larger +// cache size, as the system will only use the memory on useful file handles. +// Additionally, cached file handles keep an open file descriptor for local files. +// If a file is deleted through HDFS, this open file descriptor can keep the disk space +// from being freed. When the metadata sees that a file has been deleted, the file handle +// will no longer be used by future queries. Aging out this file handle allows the +// disk space to be freed in an appropriate period of time. +DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in seconds, that an " + "unused HDFS file handle will remain in the file handle cache. Disabled if set " + "to 0."); + +// The IoMgr is able to run with a wide range of memory usage. If a query has memory +// remaining less than this value, the IoMgr will stop all buffering regardless of the +// current queue size. +static const int LOW_MEMORY = 64 * 1024 * 1024; + +const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT; + +AtomicInt32 DiskIoMgr::next_disk_id_; + +namespace detail { +// Indicates if file handle caching should be used +static inline bool is_file_handle_caching_enabled() { + return FLAGS_max_cached_file_handles > 0; +} +} + +string DiskIoMgr::DebugString() { + stringstream ss; + ss << "Disks: " << endl; + for (int i = 0; i < disk_queues_.size(); ++i) { + unique_lock<mutex> lock(disk_queues_[i]->lock); + ss << " " << (void*) disk_queues_[i] << ":" ; + if (!disk_queues_[i]->request_contexts.empty()) { + ss << " Readers: "; + for (RequestContext* req_context: disk_queues_[i]->request_contexts) { + ss << (void*)req_context; + } + } + ss << endl; + } + return ss.str(); +} + +BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, + RequestContext* reader, ScanRange* scan_range, uint8_t* buffer, + int64_t buffer_len, MemTracker* mem_tracker) + : io_mgr_(io_mgr), + reader_(reader), + mem_tracker_(mem_tracker), + scan_range_(scan_range), + buffer_(buffer), + buffer_len_(buffer_len) { + DCHECK(io_mgr != nullptr); + DCHECK(scan_range != nullptr); + DCHECK(buffer != nullptr); + DCHECK_GE(buffer_len, 0); + DCHECK_NE(scan_range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER, + mem_tracker == nullptr); +} + +void BufferDescriptor::TransferOwnership(MemTracker* dst) { + DCHECK(dst != nullptr); + DCHECK(!is_client_buffer()); + // Memory of cached buffers is not tracked against a tracker. + if (is_cached()) return; + DCHECK(mem_tracker_ != nullptr); + dst->Consume(buffer_len_); + mem_tracker_->Release(buffer_len_); + mem_tracker_ = dst; +} + +WriteRange::WriteRange( + const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback) + : RequestRange(RequestType::WRITE), callback_(callback) { + SetRange(file, file_offset, disk_id); +} + +void WriteRange::SetRange( + const std::string& file, int64_t file_offset, int disk_id) { + file_ = file; + offset_ = file_offset; + disk_id_ = disk_id; +} + +void WriteRange::SetData(const uint8_t* buffer, int64_t len) { + data_ = buffer; + len_ = len; +} + +static void CheckSseSupport() { + if (!CpuInfo::IsSupported(CpuInfo::SSE4_2)) { + LOG(WARNING) << "This machine does not support sse4_2. The default IO system " + "configurations are suboptimal for this hardware. Consider " + "increasing the number of threads per disk by restarting impalad " + "using the --num_threads_per_disk flag with a higher value"; + } +} + +// Utility function to select flag that is set (has a positive value) based on precedence +static inline int GetFirstPositiveVal(const int first_val, const int second_val, + const int default_val) { + return first_val > 0 ? first_val : (second_val > 0 ? second_val : default_val); +} + +DiskIoMgr::DiskIoMgr() : + num_io_threads_per_rotational_disk_(GetFirstPositiveVal( + FLAGS_num_io_threads_per_rotational_disk, FLAGS_num_threads_per_disk, + THREADS_PER_ROTATIONAL_DISK)), + num_io_threads_per_solid_state_disk_(GetFirstPositiveVal( + FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk, + THREADS_PER_SOLID_STATE_DISK)), + max_buffer_size_(FLAGS_read_size), + min_buffer_size_(FLAGS_min_buffer_size), + shut_down_(false), + total_bytes_read_counter_(TUnit::BYTES), + read_timer_(TUnit::TIME_NS), + file_handle_cache_(min(FLAGS_max_cached_file_handles, + FileSystemUtil::MaxNumFileHandles()), + FLAGS_unused_file_handle_timeout_sec) { + DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size); + int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_); + free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1); + int num_local_disks = DiskInfo::num_disks(); + if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) { + LOG(WARNING) << "Number of disks specified should be between 0 and the number of " + "logical disks on the system. Defaulting to system setting of " << + DiskInfo::num_disks() << " disks"; + } else if (FLAGS_num_disks > 0) { + num_local_disks = FLAGS_num_disks; + } + disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS); + CheckSseSupport(); +} + +DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk, + int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size) : + num_io_threads_per_rotational_disk_(threads_per_rotational_disk), + num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk), + max_buffer_size_(max_buffer_size), + min_buffer_size_(min_buffer_size), + shut_down_(false), + total_bytes_read_counter_(TUnit::BYTES), + read_timer_(TUnit::TIME_NS), + file_handle_cache_(min(FLAGS_max_cached_file_handles, + FileSystemUtil::MaxNumFileHandles()), + FLAGS_unused_file_handle_timeout_sec) { + int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_); + free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1); + if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks(); + disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS); + CheckSseSupport(); +} + +DiskIoMgr::~DiskIoMgr() { + shut_down_ = true; + // Notify all worker threads and shut them down. + for (int i = 0; i < disk_queues_.size(); ++i) { + if (disk_queues_[i] == nullptr) continue; + { + // This lock is necessary to properly use the condition var to notify + // the disk worker threads. The readers also grab this lock so updates + // to shut_down_ are protected. + unique_lock<mutex> disk_lock(disk_queues_[i]->lock); + } + disk_queues_[i]->work_available.NotifyAll(); + } + disk_thread_group_.JoinAll(); + + for (int i = 0; i < disk_queues_.size(); ++i) { + if (disk_queues_[i] == nullptr) continue; + int disk_id = disk_queues_[i]->disk_id; + for (list<RequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin(); + it != disk_queues_[i]->request_contexts.end(); ++it) { + DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0); + DCHECK((*it)->disk_states_[disk_id].done()); + (*it)->DecrementDiskRefCount(); + } + } + + DCHECK_EQ(num_buffers_in_readers_.Load(), 0); + + // Delete all allocated buffers + int num_free_buffers = 0; + for (int idx = 0; idx < free_buffers_.size(); ++idx) { + num_free_buffers += free_buffers_[idx].size(); + } + DCHECK_EQ(num_allocated_buffers_.Load(), num_free_buffers); + GcIoBuffers(); + + for (int i = 0; i < disk_queues_.size(); ++i) { + delete disk_queues_[i]; + } + + if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->Close(); + if (cached_read_options_ != nullptr) hadoopRzOptionsFree(cached_read_options_); +} + +Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { + DCHECK(process_mem_tracker != nullptr); + free_buffer_mem_tracker_.reset( + new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false)); + + for (int i = 0; i < disk_queues_.size(); ++i) { + disk_queues_[i] = new DiskQueue(i); + int num_threads_per_disk; + if (i == RemoteDfsDiskId()) { + num_threads_per_disk = FLAGS_num_remote_hdfs_io_threads; + } else if (i == RemoteS3DiskId()) { + num_threads_per_disk = FLAGS_num_s3_io_threads; + } else if (i == RemoteAdlsDiskId()) { + num_threads_per_disk = FLAGS_num_adls_io_threads; + } else if (DiskInfo::is_rotational(i)) { + num_threads_per_disk = num_io_threads_per_rotational_disk_; + } else { + num_threads_per_disk = num_io_threads_per_solid_state_disk_; + } + for (int j = 0; j < num_threads_per_disk; ++j) { + stringstream ss; + ss << "work-loop(Disk: " << i << ", Thread: " << j << ")"; + std::unique_ptr<Thread> t; + RETURN_IF_ERROR(Thread::Create("disk-io-mgr", ss.str(), &DiskIoMgr::WorkLoop, + this, disk_queues_[i], &t)); + disk_thread_group_.AddThread(move(t)); + } + } + RETURN_IF_ERROR(file_handle_cache_.Init()); + + cached_read_options_ = hadoopRzOptionsAlloc(); + DCHECK(cached_read_options_ != nullptr); + // Disable checksumming for cached reads. + int ret = hadoopRzOptionsSetSkipChecksum(cached_read_options_, true); + DCHECK_EQ(ret, 0); + // Disable automatic fallback for cached reads. + ret = hadoopRzOptionsSetByteBufferPool(cached_read_options_, nullptr); + DCHECK_EQ(ret, 0); + + return Status::OK(); +} + +unique_ptr<RequestContext> DiskIoMgr::RegisterContext(MemTracker* mem_tracker) { + return unique_ptr<RequestContext>( + new RequestContext(this, num_total_disks(), mem_tracker)); +} + +void DiskIoMgr::UnregisterContext(RequestContext* reader) { + reader->CancelAndMarkInactive(); +} + +// Cancellation requires coordination from multiple threads. Each thread that currently +// has a reference to the request context must notice the cancel and remove it from its +// tracking structures. The last thread to touch the context should deallocate (aka +// recycle) the request context object. Potential threads are: +// 1. Disk threads that are currently reading for this reader. +// 2. Caller threads that are waiting in GetNext. +// +// The steps are: +// 1. Cancel will immediately set the context in the Cancelled state. This prevents any +// other thread from adding more ready buffers to the context (they all take a lock and +// check the state before doing so), or any write ranges to the context. +// 2. Cancel will call cancel on each ScanRange that is not yet complete, unblocking +// any threads in GetNext(). The reader will see the cancelled Status returned. Cancel +// also invokes the callback for the WriteRanges with the cancelled state. +// 3. Disk threads notice the context is cancelled either when picking the next context +// to process or when they try to enqueue a ready buffer. Upon noticing the cancelled +// state, removes the context from the disk queue. The last thread per disk with an +// outstanding reference to the context decrements the number of disk queues the context +// is on. +void DiskIoMgr::CancelContext(RequestContext* context) { + context->Cancel(Status::CANCELLED); +} + +void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) { + r->read_timer_ = c; +} + +void DiskIoMgr::set_bytes_read_counter(RequestContext* r, RuntimeProfile::Counter* c) { + r->bytes_read_counter_ = c; +} + +void DiskIoMgr::set_active_read_thread_counter(RequestContext* r, + RuntimeProfile::Counter* c) { + r->active_read_thread_counter_ = c; +} + +void DiskIoMgr::set_disks_access_bitmap(RequestContext* r, + RuntimeProfile::Counter* c) { + r->disks_accessed_bitmap_ = c; +} + +int64_t DiskIoMgr::queue_size(RequestContext* reader) const { + return reader->num_ready_buffers_.Load(); +} + +Status DiskIoMgr::context_status(RequestContext* context) const { + unique_lock<mutex> lock(context->lock_); + return context->status_; +} + +int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const { + return reader->bytes_read_local_.Load(); +} + +int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const { + return reader->bytes_read_short_circuit_.Load(); +} + +int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const { + return reader->bytes_read_dn_cache_.Load(); +} + +int DiskIoMgr::num_remote_ranges(RequestContext* reader) const { + return reader->num_remote_ranges_.Load(); +} + +int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const { + return reader->unexpected_remote_bytes_.Load(); +} + +int DiskIoMgr::cached_file_handles_hit_count(RequestContext* reader) const { + return reader->cached_file_handles_hit_count_.Load(); +} + +int DiskIoMgr::cached_file_handles_miss_count(RequestContext* reader) const { + return reader->cached_file_handles_miss_count_.Load(); +} + +int64_t DiskIoMgr::GetReadThroughput() { + return RuntimeProfile::UnitsPerSecond(&total_bytes_read_counter_, &read_timer_); +} + +Status DiskIoMgr::ValidateScanRange(ScanRange* range) { + int disk_id = range->disk_id_; + if (disk_id < 0 || disk_id >= disk_queues_.size()) { + return Status(TErrorCode::DISK_IO_ERROR, + Substitute("Invalid scan range. Bad disk id: $0", disk_id)); + } + if (range->offset_ < 0) { + return Status(TErrorCode::DISK_IO_ERROR, + Substitute("Invalid scan range. Negative offset $0", range->offset_)); + } + if (range->len_ < 0) { + return Status(TErrorCode::DISK_IO_ERROR, + Substitute("Invalid scan range. Negative length $0", range->len_)); + } + return Status::OK(); +} + +Status DiskIoMgr::AddScanRanges(RequestContext* reader, + const vector<ScanRange*>& ranges, bool schedule_immediately) { + if (ranges.empty()) return Status::OK(); + + // Validate and initialize all ranges + for (int i = 0; i < ranges.size(); ++i) { + RETURN_IF_ERROR(ValidateScanRange(ranges[i])); + ranges[i]->InitInternal(this, reader); + } + + // disks that this reader needs to be scheduled on. + unique_lock<mutex> reader_lock(reader->lock_); + DCHECK(reader->Validate()) << endl << reader->DebugString(); + + if (reader->state_ == RequestContext::Cancelled) { + DCHECK(!reader->status_.ok()); + return reader->status_; + } + + // Add each range to the queue of the disk the range is on + for (int i = 0; i < ranges.size(); ++i) { + // Don't add empty ranges. + DCHECK_NE(ranges[i]->len(), 0); + ScanRange* range = ranges[i]; + + if (range->try_cache_) { + if (schedule_immediately) { + bool cached_read_succeeded; + RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded)); + if (cached_read_succeeded) continue; + // Cached read failed, fall back to AddRequestRange() below. + } else { + reader->cached_ranges_.Enqueue(range); + continue; + } + } + reader->AddRequestRange(range, schedule_immediately); + } + DCHECK(reader->Validate()) << endl << reader->DebugString(); + + return Status::OK(); +} + +Status DiskIoMgr::AddScanRange( + RequestContext* reader, ScanRange* range, bool schedule_immediately) { + return AddScanRanges(reader, vector<ScanRange*>({range}), schedule_immediately); +} + +// This function returns the next scan range the reader should work on, checking +// for eos and error cases. If there isn't already a cached scan range or a scan +// range prepared by the disk threads, the caller waits on the disk threads. +Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) { + DCHECK(reader != nullptr); + DCHECK(range != nullptr); + *range = nullptr; + Status status = Status::OK(); + + unique_lock<mutex> reader_lock(reader->lock_); + DCHECK(reader->Validate()) << endl << reader->DebugString(); + + while (true) { + if (reader->state_ == RequestContext::Cancelled) { + DCHECK(!reader->status_.ok()); + status = reader->status_; + break; + } + + if (reader->num_unstarted_scan_ranges_.Load() == 0 && + reader->ready_to_start_ranges_.empty() && reader->cached_ranges_.empty()) { + // All ranges are done, just return. + break; + } + + if (!reader->cached_ranges_.empty()) { + // We have a cached range. + *range = reader->cached_ranges_.Dequeue(); + DCHECK((*range)->try_cache_); + bool cached_read_succeeded; + RETURN_IF_ERROR((*range)->ReadFromCache(reader_lock, &cached_read_succeeded)); + if (cached_read_succeeded) return Status::OK(); + + // This range ended up not being cached. Loop again and pick up a new range. + reader->AddRequestRange(*range, false); + DCHECK(reader->Validate()) << endl << reader->DebugString(); + *range = nullptr; + continue; + } + + if (reader->ready_to_start_ranges_.empty()) { + reader->ready_to_start_ranges_cv_.Wait(reader_lock); + } else { + *range = reader->ready_to_start_ranges_.Dequeue(); + DCHECK(*range != nullptr); + int disk_id = (*range)->disk_id(); + DCHECK_EQ(*range, reader->disk_states_[disk_id].next_scan_range_to_start()); + // Set this to nullptr, the next time this disk runs for this reader, it will + // get another range ready. + reader->disk_states_[disk_id].set_next_scan_range_to_start(nullptr); + reader->ScheduleScanRange(*range); + break; + } + } + return status; +} + +Status DiskIoMgr::Read(RequestContext* reader, + ScanRange* range, std::unique_ptr<BufferDescriptor>* buffer) { + DCHECK(range != nullptr); + DCHECK(buffer != nullptr); + *buffer = nullptr; + + if (range->len() > max_buffer_size_ + && range->external_buffer_tag_ != ScanRange::ExternalBufferTag::CLIENT_BUFFER) { + return Status(TErrorCode::DISK_IO_ERROR, Substitute("Internal error: cannot " + "perform sync read of '$0' bytes that is larger than the max read buffer size " + "'$1'.", range->len(), max_buffer_size_)); + } + + vector<ScanRange*> ranges; + ranges.push_back(range); + RETURN_IF_ERROR(AddScanRanges(reader, ranges, true)); + RETURN_IF_ERROR(range->GetNext(buffer)); + DCHECK((*buffer) != nullptr); + DCHECK((*buffer)->eosr()); + return Status::OK(); +} + +void DiskIoMgr::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) { + DCHECK(buffer_desc != nullptr); + if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == nullptr); + + RequestContext* reader = buffer_desc->reader_; + if (buffer_desc->buffer_ != nullptr) { + if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) { + // Buffers the were not allocated by DiskIoMgr don't need to be freed. + FreeBufferMemory(buffer_desc.get()); + } + buffer_desc->buffer_ = nullptr; + num_buffers_in_readers_.Add(-1); + reader->num_buffers_in_reader_.Add(-1); + } else { + // A nullptr buffer means there was an error in which case there is no buffer + // to return. + } + + if (buffer_desc->eosr_ || buffer_desc->scan_range_->is_cancelled_) { + // Need to close the scan range if returning the last buffer or the scan range + // has been cancelled (and the caller might never get the last buffer). + // Close() is idempotent so multiple cancelled buffers is okay. + buffer_desc->scan_range_->Close(); + } +} + +unique_ptr<BufferDescriptor> DiskIoMgr::GetFreeBuffer( + RequestContext* reader, ScanRange* range, int64_t buffer_size) { + DCHECK_LE(buffer_size, max_buffer_size_); + DCHECK_GT(buffer_size, 0); + buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size); + int idx = free_buffers_idx(buffer_size); + // Quantize buffer size to nearest power of 2 greater than the specified buffer size and + // convert to bytes + buffer_size = (1LL << idx) * min_buffer_size_; + + // Track memory against the reader. This is checked the next time we start + // a read for the next reader in DiskIoMgr::GetNextScanRange(). + DCHECK(reader->mem_tracker_ != nullptr); + reader->mem_tracker_->Consume(buffer_size); + + uint8_t* buffer = nullptr; + { + unique_lock<mutex> lock(free_buffers_lock_); + if (free_buffers_[idx].empty()) { + num_allocated_buffers_.Add(1); + if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) { + ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L); + } + if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) { + ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size); + } + // We already tracked this memory against the reader's MemTracker. + buffer = new uint8_t[buffer_size]; + } else { + if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) { + ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L); + } + buffer = free_buffers_[idx].front(); + free_buffers_[idx].pop_front(); + free_buffer_mem_tracker_->Release(buffer_size); + } + } + + // Validate more invariants. + DCHECK(range != nullptr); + DCHECK(reader != nullptr); + DCHECK(buffer != nullptr); + return unique_ptr<BufferDescriptor>(new BufferDescriptor( + this, reader, range, buffer, buffer_size, reader->mem_tracker_)); +} + +void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) { + unique_lock<mutex> lock(free_buffers_lock_); + int buffers_freed = 0; + int bytes_freed = 0; + // Free small-to-large to avoid retaining many small buffers and fragmenting memory. + for (int idx = 0; idx < free_buffers_.size(); ++idx) { + deque<uint8_t*>* free_buffers = &free_buffers_[idx]; + while ( + !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= bytes_to_free)) { + uint8_t* buffer = free_buffers->front(); + free_buffers->pop_front(); + int64_t buffer_size = (1LL << idx) * min_buffer_size_; + delete[] buffer; + free_buffer_mem_tracker_->Release(buffer_size); + num_allocated_buffers_.Add(-1); + + ++buffers_freed; + bytes_freed += buffer_size; + } + if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break; + } + + if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) { + ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-buffers_freed); + } + if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) { + ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed); + } + if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) { + ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed); + } +} + +void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) { + DCHECK(!desc->is_cached()); + DCHECK(!desc->is_client_buffer()); + uint8_t* buffer = desc->buffer_; + int64_t buffer_size = desc->buffer_len_; + int idx = free_buffers_idx(buffer_size); + DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0) + << "buffer_size_ / min_buffer_size_ should be power of 2, got buffer_size = " + << buffer_size << ", min_buffer_size_ = " << min_buffer_size_; + + { + unique_lock<mutex> lock(free_buffers_lock_); + if (!FLAGS_disable_mem_pools && + free_buffers_[idx].size() < FLAGS_max_free_io_buffers) { + free_buffers_[idx].push_back(buffer); + if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) { + ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L); + } + // This consume call needs to be protected by 'free_buffers_lock_' to avoid a race + // with a Release() call for the same buffer that could make consumption negative. + // Note: we can't use TryConsume(), which can indirectly call GcIoBuffers(). + // TODO: after IMPALA-3200 is completed, we should be able to leverage the buffer + // pool's free lists, and remove these free lists. + free_buffer_mem_tracker_->Consume(buffer_size); + } else { + num_allocated_buffers_.Add(-1); + delete[] buffer; + if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) { + ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L); + } + if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) { + ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size); + } + } + } + + // We transferred the buffer ownership from the BufferDescriptor to the DiskIoMgr. + desc->mem_tracker_->Release(buffer_size); + desc->buffer_ = nullptr; +} + +// This function gets the next RequestRange to work on for this disk. It checks for +// cancellation and +// a) Updates ready_to_start_ranges if there are no scan ranges queued for this disk. +// b) Adds an unstarted write range to in_flight_ranges_. The write range is processed +// immediately if there are no preceding scan ranges in in_flight_ranges_ +// It blocks until work is available or the thread is shut down. +// Work is available if there is a RequestContext with +// - A ScanRange with a buffer available, or +// - A WriteRange in unstarted_write_ranges_. +bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, + RequestContext** request_context) { + int disk_id = disk_queue->disk_id; + *range = nullptr; + + // This loops returns either with work to do or when the disk IoMgr shuts down. + while (true) { + *request_context = nullptr; + RequestContext::PerDiskState* request_disk_state = nullptr; + { + unique_lock<mutex> disk_lock(disk_queue->lock); + + while (!shut_down_ && disk_queue->request_contexts.empty()) { + // wait if there are no readers on the queue + disk_queue->work_available.Wait(disk_lock); + } + if (shut_down_) break; + DCHECK(!disk_queue->request_contexts.empty()); + + // Get the next reader and remove the reader so that another disk thread + // can't pick it up. It will be enqueued before issuing the read to HDFS + // so this is not a big deal (i.e. multiple disk threads can read for the + // same reader). + // TODO: revisit. + *request_context = disk_queue->request_contexts.front(); + disk_queue->request_contexts.pop_front(); + DCHECK(*request_context != nullptr); + request_disk_state = &((*request_context)->disk_states_[disk_id]); + request_disk_state->IncrementRequestThreadAndDequeue(); + } + + // NOTE: no locks were taken in between. We need to be careful about what state + // could have changed to the reader and disk in between. + // There are some invariants here. Only one disk thread can have the + // same reader here (the reader is removed from the queue). There can be + // other disk threads operating on this reader in other functions though. + + // We just picked a reader. Before we may allocate a buffer on its behalf, check that + // it has not exceeded any memory limits (e.g. the query or process limit). + // TODO: once IMPALA-3200 is fixed, we should be able to remove the free lists and + // move these memory limit checks to GetFreeBuffer(). + // Note that calling AnyLimitExceeded() can result in a call to GcIoBuffers(). + // TODO: IMPALA-3209: we should not force a reader over its memory limit by + // pushing more buffers to it. Most readers can make progress and operate within + // a fixed memory limit. + if ((*request_context)->mem_tracker_ != nullptr + && (*request_context)->mem_tracker_->AnyLimitExceeded()) { + (*request_context)->Cancel(Status::MemLimitExceeded()); + } + + unique_lock<mutex> request_lock((*request_context)->lock_); + VLOG_FILE << "Disk (id=" << disk_id << ") reading for " + << (*request_context)->DebugString(); + + // Check if reader has been cancelled + if ((*request_context)->state_ == RequestContext::Cancelled) { + request_disk_state->DecrementRequestThreadAndCheckDone(*request_context); + continue; + } + + DCHECK_EQ((*request_context)->state_, RequestContext::Active) + << (*request_context)->DebugString(); + + if (request_disk_state->next_scan_range_to_start() == nullptr && + !request_disk_state->unstarted_scan_ranges()->empty()) { + // We don't have a range queued for this disk for what the caller should + // read next. Populate that. We want to have one range waiting to minimize + // wait time in GetNextRange. + ScanRange* new_range = request_disk_state->unstarted_scan_ranges()->Dequeue(); + (*request_context)->num_unstarted_scan_ranges_.Add(-1); + (*request_context)->ready_to_start_ranges_.Enqueue(new_range); + request_disk_state->set_next_scan_range_to_start(new_range); + + if ((*request_context)->num_unstarted_scan_ranges_.Load() == 0) { + // All the ranges have been started, notify everyone blocked on GetNextRange. + // Only one of them will get work so make sure to return nullptr to the other + // caller threads. + (*request_context)->ready_to_start_ranges_cv_.NotifyAll(); + } else { + (*request_context)->ready_to_start_ranges_cv_.NotifyOne(); + } + } + + // Always enqueue a WriteRange to be processed into in_flight_ranges_. + // This is done so in_flight_ranges_ does not exclusively contain ScanRanges. + // For now, enqueuing a WriteRange on each invocation of GetNextRequestRange() + // does not flood in_flight_ranges() with WriteRanges because the entire + // WriteRange is processed and removed from the queue after GetNextRequestRange() + // returns. (A DCHECK is used to ensure that writes do not exceed 8MB). + if (!request_disk_state->unstarted_write_ranges()->empty()) { + WriteRange* write_range = request_disk_state->unstarted_write_ranges()->Dequeue(); + request_disk_state->in_flight_ranges()->Enqueue(write_range); + } + + // Get the next scan range to work on from the reader. Only in_flight_ranges + // are eligible since the disk threads do not start new ranges on their own. + + // There are no inflight ranges, nothing to do. + if (request_disk_state->in_flight_ranges()->empty()) { + request_disk_state->DecrementRequestThread(); + continue; + } + DCHECK_GT(request_disk_state->num_remaining_ranges(), 0); + *range = request_disk_state->in_flight_ranges()->Dequeue(); + DCHECK(*range != nullptr); + + // Now that we've picked a request range, put the context back on the queue so + // another thread can pick up another request range for this context. + request_disk_state->ScheduleContext(*request_context, disk_id); + DCHECK((*request_context)->Validate()) << endl << (*request_context)->DebugString(); + return true; + } + + DCHECK(shut_down_); + return false; +} + +void DiskIoMgr::HandleWriteFinished( + RequestContext* writer, WriteRange* write_range, const Status& write_status) { + // Copy disk_id before running callback: the callback may modify write_range. + int disk_id = write_range->disk_id_; + + // Execute the callback before decrementing the thread count. Otherwise CancelContext() + // that waits for the disk ref count to be 0 will return, creating a race, e.g. see + // IMPALA-1890. + // The status of the write does not affect the status of the writer context. + write_range->callback_(write_status); + { + unique_lock<mutex> writer_lock(writer->lock_); + DCHECK(writer->Validate()) << endl << writer->DebugString(); + RequestContext::PerDiskState& state = writer->disk_states_[disk_id]; + if (writer->state_ == RequestContext::Cancelled) { + state.DecrementRequestThreadAndCheckDone(writer); + } else { + state.DecrementRequestThread(); + } + --state.num_remaining_ranges(); + } +} + +void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader, + unique_ptr<BufferDescriptor> buffer) { + unique_lock<mutex> reader_lock(reader->lock_); + + RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id]; + DCHECK(reader->Validate()) << endl << reader->DebugString(); + DCHECK_GT(state.num_threads_in_op(), 0); + DCHECK(buffer->buffer_ != nullptr); + + if (reader->state_ == RequestContext::Cancelled) { + state.DecrementRequestThreadAndCheckDone(reader); + DCHECK(reader->Validate()) << endl << reader->DebugString(); + if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get()); + buffer->buffer_ = nullptr; + ScanRange* scan_range = buffer->scan_range_; + scan_range->Cancel(reader->status_); + // Enqueue the buffer to use the scan range's buffer cleanup path. + scan_range->EnqueueBuffer(reader_lock, move(buffer)); + return; + } + + DCHECK_EQ(reader->state_, RequestContext::Active); + DCHECK(buffer->buffer_ != nullptr); + + // Update the reader's scan ranges. There are a three cases here: + // 1. Read error + // 2. End of scan range + // 3. Middle of scan range + if (!buffer->status_.ok()) { + // Error case + if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get()); + buffer->buffer_ = nullptr; + buffer->eosr_ = true; + --state.num_remaining_ranges(); + buffer->scan_range_->Cancel(buffer->status_); + } else if (buffer->eosr_) { + --state.num_remaining_ranges(); + } + + // After calling EnqueueBuffer(), it is no longer valid to read from buffer. + // Store the state we need before calling EnqueueBuffer(). + bool eosr = buffer->eosr_; + ScanRange* scan_range = buffer->scan_range_; + bool is_cached = buffer->is_cached(); + bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer)); + if (eosr) { + // For cached buffers, we can't close the range until the cached buffer is returned. + // Close() is called from DiskIoMgr::ReturnBuffer(). + if (!is_cached) scan_range->Close(); + } else { + if (queue_full) { + reader->blocked_ranges_.Enqueue(scan_range); + } else { + reader->ScheduleScanRange(scan_range); + } + } + state.DecrementRequestThread(); +} + +void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) { + // The thread waits until there is work or the entire system is being shut down. + // If there is work, performs the read or write requested and re-enqueues the + // requesting context. + // Locks are not taken when reading from or writing to disk. + // The main loop has three parts: + // 1. GetNextRequestContext(): get the next request context (read or write) to + // process and dequeue it. + // 2. For the dequeued request, gets the next scan- or write-range to process and + // re-enqueues the request. + // 3. Perform the read or write as specified. + // Cancellation checking needs to happen in both steps 1 and 3. + while (true) { + RequestContext* worker_context = nullptr;; + RequestRange* range = nullptr; + + if (!GetNextRequestRange(disk_queue, &range, &worker_context)) { + DCHECK(shut_down_); + break; + } + + if (range->request_type() == RequestType::READ) { + ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range)); + } else { + DCHECK(range->request_type() == RequestType::WRITE); + Write(worker_context, static_cast<WriteRange*>(range)); + } + } + + DCHECK(shut_down_); +} + +// This function reads the specified scan range associated with the +// specified reader context and disk queue. +void DiskIoMgr::ReadRange( + DiskQueue* disk_queue, RequestContext* reader, ScanRange* range) { + int64_t bytes_remaining = range->len_ - range->bytes_read_; + DCHECK_GT(bytes_remaining, 0); + unique_ptr<BufferDescriptor> buffer_desc; + if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) { + buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, reader, range, + range->client_buffer_.data, range->client_buffer_.len, nullptr)); + } else { + // Need to allocate a buffer to read into. + int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_)); + buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, buffer_size); + if (buffer_desc == nullptr) return; + } + reader->num_used_buffers_.Add(1); + + // No locks in this section. Only working on local vars. We don't want to hold a + // lock across the read call. + buffer_desc->status_ = range->Open(detail::is_file_handle_caching_enabled()); + if (buffer_desc->status_.ok()) { + // Update counters. + if (reader->active_read_thread_counter_) { + reader->active_read_thread_counter_->Add(1L); + } + if (reader->disks_accessed_bitmap_) { + int64_t disk_bit = 1LL << disk_queue->disk_id; + reader->disks_accessed_bitmap_->BitOr(disk_bit); + } + SCOPED_TIMER(&read_timer_); + SCOPED_TIMER(reader->read_timer_); + + buffer_desc->status_ = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_, + &buffer_desc->len_, &buffer_desc->eosr_); + buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_; + + if (reader->bytes_read_counter_ != nullptr) { + COUNTER_ADD(reader->bytes_read_counter_, buffer_desc->len_); + } + + COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_); + if (reader->active_read_thread_counter_) { + reader->active_read_thread_counter_->Add(-1L); + } + } + + // Finished read, update reader/disk based on the results + HandleReadFinished(disk_queue, reader, move(buffer_desc)); +} + +unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange( + DiskQueue* disk_queue, RequestContext* reader, ScanRange* range, + int64_t buffer_size) { + DCHECK(reader->mem_tracker_ != nullptr); + bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY; + if (!enough_memory) { + // Low memory, GC all the buffers and try again. + GcIoBuffers(); + enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY; + } + + if (!enough_memory) { + RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id]; + unique_lock<mutex> reader_lock(reader->lock_); + + // Just grabbed the reader lock, check for cancellation. + if (reader->state_ == RequestContext::Cancelled) { + DCHECK(reader->Validate()) << endl << reader->DebugString(); + state.DecrementRequestThreadAndCheckDone(reader); + range->Cancel(reader->status_); + DCHECK(reader->Validate()) << endl << reader->DebugString(); + return nullptr; + } + + if (!range->ready_buffers_.empty()) { + // We have memory pressure and this range doesn't need another buffer + // (it already has one queued). Skip this range and pick it up later. + range->blocked_on_queue_ = true; + reader->blocked_ranges_.Enqueue(range); + state.DecrementRequestThread(); + return nullptr; + } else { + // We need to get a buffer anyway since there are none queued. The query + // is likely to fail due to mem limits but there's nothing we can do about that + // now. + } + } + unique_ptr<BufferDescriptor> buffer_desc = GetFreeBuffer(reader, range, buffer_size); + DCHECK(buffer_desc != nullptr); + return buffer_desc; +} + +void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) { + Status ret_status = Status::OK(); + FILE* file_handle = nullptr; + // Raw open() syscall will create file if not present when passed these flags. + int fd = open(write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (fd < 0) { + ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR, + Substitute("Opening '$0' for write failed with errno=$1 description=$2", + write_range->file_, errno, GetStrErrMsg()))); + } else { + file_handle = fdopen(fd, "wb"); + if (file_handle == nullptr) { + ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR, + Substitute("fdopen($0, \"wb\") failed with errno=$1 description=$2", fd, errno, + GetStrErrMsg()))); + } + } + + if (file_handle != nullptr) { + ret_status = WriteRangeHelper(file_handle, write_range); + + int success = fclose(file_handle); + if (ret_status.ok() && success != 0) { + ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR, + Substitute("fclose($0) failed", write_range->file_))); + } + } + + HandleWriteFinished(writer_context, write_range, ret_status); +} + +Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) { + // Seek to the correct offset and perform the write. + int success = fseek(file_handle, write_range->offset(), SEEK_SET); + if (success != 0) { + return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR, + Substitute("fseek($0, $1, SEEK_SET) failed with errno=$2 description=$3", + write_range->file_, write_range->offset(), errno, GetStrErrMsg()))); + } + +#ifndef NDEBUG + if (FLAGS_stress_scratch_write_delay_ms > 0) { + SleepForMs(FLAGS_stress_scratch_write_delay_ms); + } +#endif + int64_t bytes_written = fwrite(write_range->data_, 1, write_range->len_, file_handle); + if (bytes_written < write_range->len_) { + return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR, + Substitute("fwrite(buffer, 1, $0, $1) failed with errno=$2 description=$3", + write_range->len_, write_range->file_, errno, GetStrErrMsg()))); + } + if (ImpaladMetrics::IO_MGR_BYTES_WRITTEN != nullptr) { + ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_); + } + + return Status::OK(); +} + +int DiskIoMgr::free_buffers_idx(int64_t buffer_size) { + int64_t buffer_size_scaled = BitUtil::Ceil(buffer_size, min_buffer_size_); + int idx = BitUtil::Log2Ceiling64(buffer_size_scaled); + DCHECK_GE(idx, 0); + DCHECK_LT(idx, free_buffers_.size()); + return idx; +} + +Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* write_range) { + unique_lock<mutex> writer_lock(writer->lock_); + + if (writer->state_ == RequestContext::Cancelled) { + DCHECK(!writer->status_.ok()); + return writer->status_; + } + + writer->AddRequestRange(write_range, false); + return Status::OK(); +} + +int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) { + // If it's a remote range, check for an appropriate remote disk queue. + if (!expected_local) { + if (IsHdfsPath(file) && FLAGS_num_remote_hdfs_io_threads > 0) { + return RemoteDfsDiskId(); + } + if (IsS3APath(file)) return RemoteS3DiskId(); + if (IsADLSPath(file)) return RemoteAdlsDiskId(); + } + // Assign to a local disk queue. + DCHECK(!IsS3APath(file)); // S3 is always remote. + DCHECK(!IsADLSPath(file)); // ADLS is always remote. + if (disk_id == -1) { + // disk id is unknown, assign it an arbitrary one. + disk_id = next_disk_id_.Add(1); + } + // TODO: we need to parse the config for the number of dirs configured for this + // data node. + return disk_id % num_local_disks(); +} + +HdfsFileHandle* DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs, + std::string* fname, int64_t mtime, RequestContext *reader, + bool require_new) { + bool cache_hit; + HdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, require_new, + &cache_hit); + if (fh == nullptr) return nullptr; + if (cache_hit) { + DCHECK(!require_new); + ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(1L); + ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT->Increment(1L); + reader->cached_file_handles_hit_count_.Add(1L); + } else { + ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(0L); + ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT->Increment(1L); + reader->cached_file_handles_miss_count_.Add(1L); + } + return fh; +} + +void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid, + bool destroy_handle) { + file_handle_cache_.ReleaseFileHandle(fname, fid, destroy_handle); +} + +Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, + int64_t mtime, HdfsFileHandle** fid) { + bool cache_hit; + file_handle_cache_.ReleaseFileHandle(fname, *fid, true); + // The old handle has been destroyed, so *fid must be overwritten before returning. + *fid = file_handle_cache_.GetFileHandle(fs, fname, mtime, true, + &cache_hit); + if (*fid == nullptr) { + return Status(TErrorCode::DISK_IO_ERROR, + GetHdfsErrorMsg("Failed to open HDFS file ", fname->data())); + } + DCHECK(!cache_hit); + return Status::OK(); +}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h new file mode 100644 index 0000000..71dc840 --- /dev/null +++ b/be/src/runtime/io/disk-io-mgr.h @@ -0,0 +1,550 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_IO_DISK_IO_MGR_H +#define IMPALA_RUNTIME_IO_DISK_IO_MGR_H + +#include <deque> +#include <functional> +#include <vector> + +#include <boost/scoped_ptr.hpp> +#include <boost/unordered_set.hpp> +#include <boost/thread/mutex.hpp> + +#include "common/atomic.h" +#include "common/hdfs.h" +#include "common/object-pool.h" +#include "common/status.h" +#include "runtime/io/handle-cache.h" +#include "runtime/io/request-ranges.h" +#include "runtime/thread-resource-mgr.h" +#include "util/aligned-new.h" +#include "util/bit-util.h" +#include "util/condition-variable.h" +#include "util/error-util.h" +#include "util/runtime-profile.h" +#include "util/thread.h" + +namespace impala { + +class MemTracker; + +namespace io { +/// Manager object that schedules IO for all queries on all disks and remote filesystems +/// (such as S3). Each query maps to one or more RequestContext objects, each of which +/// has its own queue of scan ranges and/or write ranges. +// +/// The API splits up requesting scan/write ranges (non-blocking) and reading the data +/// (blocking). The DiskIoMgr has worker threads that will read from and write to +/// disk/hdfs/remote-filesystems, allowing interleaving of IO and CPU. This allows us to +/// keep all disks and all cores as busy as possible. +// +/// All public APIs are thread-safe. It is not valid to call any of the APIs after +/// UnregisterContext() returns. +// +/// For Readers: +/// We can model this problem as a multiple producer (threads for each disk), multiple +/// consumer (scan ranges) problem. There are multiple queues that need to be +/// synchronized. Conceptually, there are two queues: +/// 1. The per disk queue: this contains a queue of readers that need reads. +/// 2. The per scan range ready-buffer queue: this contains buffers that have been +/// read and are ready for the caller. +/// The disk queue contains a queue of readers and is scheduled in a round robin fashion. +/// Readers map to scan nodes. The reader then contains a queue of scan ranges. The caller +/// asks the IoMgr for the next range to process. The IoMgr then selects the best range +/// to read based on disk activity and begins reading and queuing buffers for that range. +/// TODO: We should map readers to queries. A reader is the unit of scheduling and queries +/// that have multiple scan nodes shouldn't have more 'turns'. +// +/// For Writers: +/// Data is written via AddWriteRange(). This is non-blocking and adds a WriteRange to a +/// per-disk queue. After the write is complete, a callback in WriteRange is invoked. +/// No memory is allocated within IoMgr for writes and no copies are made. It is the +/// responsibility of the client to ensure that the data to be written is valid and that +/// the file to be written to exists until the callback is invoked. +// +/// The IoMgr provides three key APIs. +/// 1. AddScanRanges: this is non-blocking and tells the IoMgr all the ranges that +/// will eventually need to be read. +/// 2. GetNextRange: returns to the caller the next scan range it should process. +/// This is based on disk load. This also begins reading the data in this scan +/// range. This is blocking. +/// 3. ScanRange::GetNext: returns the next buffer for this range. This is blocking. +// +/// The disk threads do not synchronize with each other. The readers and writers don't +/// synchronize with each other. There is a lock and condition variable for each request +/// context queue and each disk queue. +/// IMPORTANT: whenever both locks are needed, the lock order is to grab the context lock +/// before the disk lock. +// +/// Scheduling: If there are multiple request contexts with work for a single disk, the +/// request contexts are scheduled in round-robin order. Multiple disk threads can +/// operate on the same request context. Exactly one request range is processed by a +/// disk thread at a time. If there are multiple scan ranges scheduled via +/// GetNextRange() for a single context, these are processed in round-robin order. +/// If there are multiple scan and write ranges for a disk, a read is always followed +/// by a write, and a write is followed by a read, i.e. reads and writes alternate. +/// If multiple write ranges are enqueued for a single disk, they will be processed +/// by the disk threads in order, but may complete in any order. No guarantees are made +/// on ordering of writes across disks. +// +/// Resource Management: effective resource management in the IoMgr is key to good +/// performance. The IoMgr helps coordinate two resources: CPU and disk. For CPU, +/// spinning up too many threads causes thrashing. +/// Memory usage in the IoMgr comes from queued read buffers. If we queue the minimum +/// (i.e. 1), then the disks are idle while we are processing the buffer. If we don't +/// limit the queue, then it possible we end up queueing the entire data set (i.e. CPU +/// is slower than disks) and run out of memory. +/// For both CPU and memory, we want to model the machine as having a fixed amount of +/// resources. If a single query is running, it should saturate either CPU or Disk +/// as well as using as little memory as possible. With multiple queries, each query +/// should get less CPU. In that case each query will need fewer queued buffers and +/// therefore have less memory usage. +// +/// The IoMgr defers CPU management to the caller. The IoMgr provides a GetNextRange +/// API which will return the next scan range the caller should process. The caller +/// can call this from the desired number of reading threads. Once a scan range +/// has been returned via GetNextRange, the IoMgr will start to buffer reads for +/// that range and it is expected the caller will pull those buffers promptly. For +/// example, if the caller would like to have 1 scanner thread, the read loop +/// would look like: +/// while (more_ranges) +/// range = GetNextRange() +/// while (!range.eosr) +/// buffer = range.GetNext() +/// To have multiple reading threads, the caller would simply spin up the threads +/// and each would process the loops above. +// +/// To control the number of IO buffers, each scan range has a limit of two queued +/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at capacity, +/// the IoMgr will no longer read for that scan range until the caller has processed +/// a buffer. Assuming the client returns each buffer before requesting the next one +/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O buffers per +/// scan range. +// +/// Buffer Management: +/// Buffers for reads are either a) allocated by the IoMgr and transferred to the caller, +/// b) cached HDFS buffers if the scan range uses HDFS caching, or c) provided by the +/// caller when constructing the scan range. +/// +/// As a caller reads from a scan range, these buffers are wrapped in BufferDescriptors +/// and returned to the caller. The caller must always call ReturnBuffer() on the buffer +/// descriptor to allow recycling of the associated buffer (if there is an +/// IoMgr-allocated or HDFS cached buffer). +/// +/// Caching support: +/// Scan ranges contain metadata on whether or not it is cached on the DN. In that +/// case, we use the HDFS APIs to read the cached data without doing any copies. For these +/// ranges, the reads happen on the caller thread (as opposed to the disk threads). +/// It is possible for the cached read APIs to fail, in which case the ranges are then +/// queued on the disk threads and behave identically to the case where the range +/// is not cached. +/// Resources for these ranges are also not accounted against the reader because none +/// are consumed. +/// While a cached block is being processed, the block is mlocked. We want to minimize +/// the time the mlock is held. +/// - HDFS will time us out if we hold onto the mlock for too long +/// - Holding the lock prevents uncaching this file due to a caching policy change. +/// Therefore, we only issue the cached read when the caller is ready to process the +/// range (GetNextRange()) instead of when the ranges are issued. This guarantees that +/// there will be a CPU available to process the buffer and any throttling we do with +/// the number of scanner threads properly controls the amount of files we mlock. +/// With cached scan ranges, we cannot close the scan range until the cached buffer +/// is returned (HDFS does not allow this). We therefore need to defer the close until +/// the cached buffer is returned (ReturnBuffer()). +// +/// Remote filesystem support (e.g. S3): +/// Remote filesystems are modeled as "remote disks". That is, there is a seperate disk +/// queue for each supported remote filesystem type. In order to maximize throughput, +/// multiple connections are opened in parallel by having multiple threads running per +/// queue. Also note that reading from a remote filesystem service can be more CPU +/// intensive than local disk/hdfs because of non-direct I/O and SSL processing, and can +/// be CPU bottlenecked especially if not enough I/O threads for these queues are +/// started. +// +/// TODO: IoMgr should be able to request additional scan ranges from the coordinator +/// to help deal with stragglers. +/// TODO: look into using a lock free queue +/// TODO: simplify the common path (less locking, memory allocations). +// +/// Structure of the Implementation: +/// - All client APIs are defined in this file, request-ranges.h and request-context.h. +/// Clients can include only the files that they need. +/// - Some internal classes are defined in disk-io-mgr-internal.h +/// - ScanRange APIs are implemented in scan-range.cc +/// This contains the ready buffer queue logic +/// - RequestContext APIs are implemented in request-context.cc +/// This contains the logic for picking scan ranges for a reader. +/// - Disk Thread and general APIs are implemented in disk-io-mgr.cc. +/// - The handle cache is implemented in handle-cache{.inline,}.h + +// This is cache line aligned because the FileHandleCache needs cache line alignment +// for its partitions. +class DiskIoMgr : public CacheLineAligned { + public: + /// Create a DiskIoMgr object. This constructor is only used for testing. + /// - num_disks: The number of disks the IoMgr should use. This is used for testing. + /// Specify 0, to have the disk IoMgr query the os for the number of disks. + /// - threads_per_rotational_disk: number of read threads to create per rotational + /// disk. This is also the max queue depth. + /// - threads_per_solid_state_disk: number of read threads to create per solid state + /// disk. This is also the max queue depth. + /// - min_buffer_size: minimum io buffer size (in bytes) + /// - max_buffer_size: maximum io buffer size (in bytes). Also the max read size. + DiskIoMgr(int num_disks, int threads_per_rotational_disk, + int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size); + + /// Create DiskIoMgr with default configs. + DiskIoMgr(); + + /// Clean up all threads and resources. This is mostly useful for testing since + /// for impalad, this object is never destroyed. + ~DiskIoMgr(); + + /// Initialize the IoMgr. Must be called once before any of the other APIs. + Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT; + + /// Allocates tracking structure for a request context. + /// Register a new request context and return it to the caller. The caller must call + /// UnregisterContext() for each context. + /// reader_mem_tracker: Is non-null only for readers. IO buffers + /// used for this reader will be tracked by this. If the limit is exceeded + /// the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via + /// GetNext(). + std::unique_ptr<RequestContext> RegisterContext(MemTracker* reader_mem_tracker); + + /// Unregisters context from the disk IoMgr by first cancelling it then blocking until + /// all references to the context are removed from I/O manager internal data structures. + /// This must be called for every RegisterContext() to ensure that the context object + /// can be safely destroyed. It is invalid to add more request ranges to 'context' after + /// after this call. This call blocks until all the disk threads have finished cleaning + /// up. + void UnregisterContext(RequestContext* context); + + /// This function cancels the context asychronously. All outstanding requests + /// are aborted and tracking structures cleaned up. This does not need to be + /// called if the context finishes normally. + /// This will also fail any outstanding GetNext()/Read requests. + void CancelContext(RequestContext* context); + + /// Adds the scan ranges to the queues. This call is non-blocking. The caller must + /// not deallocate the scan range pointers before UnregisterContext(). + /// If schedule_immediately, the ranges are immediately put on the read queue + /// (i.e. the caller should not/cannot call GetNextRange for these ranges). + /// This can be used to do synchronous reads as well as schedule dependent ranges, + /// as in the case for columnar formats. + Status AddScanRanges(RequestContext* reader, + const std::vector<ScanRange*>& ranges, + bool schedule_immediately = false) WARN_UNUSED_RESULT; + Status AddScanRange(RequestContext* reader, ScanRange* range, + bool schedule_immediately = false) WARN_UNUSED_RESULT; + + /// Add a WriteRange for the writer. This is non-blocking and schedules the context + /// on the IoMgr disk queue. Does not create any files. + Status AddWriteRange( + RequestContext* writer, WriteRange* write_range) WARN_UNUSED_RESULT; + + /// Returns the next unstarted scan range for this reader. When the range is returned, + /// the disk threads in the IoMgr will already have started reading from it. The + /// caller is expected to call ScanRange::GetNext on the returned range. + /// If there are no more unstarted ranges, nullptr is returned. + /// This call is blocking. + Status GetNextRange(RequestContext* reader, ScanRange** range) WARN_UNUSED_RESULT; + + /// Reads the range and returns the result in buffer. + /// This behaves like the typical synchronous read() api, blocking until the data + /// is read. This can be called while there are outstanding ScanRanges and is + /// thread safe. Multiple threads can be calling Read() per reader at a time. + /// range *cannot* have already been added via AddScanRanges. + /// This can only be used if the scan range fits in a single IO buffer (i.e. is smaller + /// than max_read_buffer_size()) or if reading into a client-provided buffer. + Status Read(RequestContext* reader, ScanRange* range, + std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; + + /// Returns the buffer to the IoMgr. This must be called for every buffer + /// returned by GetNext()/Read() that did not return an error. This is non-blocking. + /// After calling this, the buffer descriptor is invalid and cannot be accessed. + void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer); + + /// Determine which disk queue this file should be assigned to. Returns an index into + /// disk_queues_. The disk_id is the volume ID for the local disk that holds the + /// files, or -1 if unknown. Flag expected_local is true iff this impalad is + /// co-located with the datanode for this file. + int AssignQueue(const char* file, int disk_id, bool expected_local); + + /// TODO: The functions below can be moved to RequestContext. + /// Returns the current status of the context. + Status context_status(RequestContext* context) const WARN_UNUSED_RESULT; + + void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*); + void set_read_timer(RequestContext*, RuntimeProfile::Counter*); + void set_active_read_thread_counter(RequestContext*, RuntimeProfile::Counter*); + void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*); + + int64_t queue_size(RequestContext* reader) const; + int64_t bytes_read_local(RequestContext* reader) const; + int64_t bytes_read_short_circuit(RequestContext* reader) const; + int64_t bytes_read_dn_cache(RequestContext* reader) const; + int num_remote_ranges(RequestContext* reader) const; + int64_t unexpected_remote_bytes(RequestContext* reader) const; + int cached_file_handles_hit_count(RequestContext* reader) const; + int cached_file_handles_miss_count(RequestContext* reader) const; + + /// Returns the read throughput across all readers. + /// TODO: should this be a sliding window? This should report metrics for the + /// last minute, hour and since the beginning. + int64_t GetReadThroughput(); + + /// Returns the maximum read buffer size + int max_read_buffer_size() const { return max_buffer_size_; } + + /// Returns the total number of disk queues (both local and remote). + int num_total_disks() const { return disk_queues_.size(); } + + /// Returns the total number of remote "disk" queues. + int num_remote_disks() const { return REMOTE_NUM_DISKS; } + + /// Returns the number of local disks attached to the system. + int num_local_disks() const { return num_total_disks() - num_remote_disks(); } + + /// The disk ID (and therefore disk_queues_ index) used for DFS accesses. + int RemoteDfsDiskId() const { return num_local_disks() + REMOTE_DFS_DISK_OFFSET; } + + /// The disk ID (and therefore disk_queues_ index) used for S3 accesses. + int RemoteS3DiskId() const { return num_local_disks() + REMOTE_S3_DISK_OFFSET; } + + /// The disk ID (and therefore disk_queues_ index) used for ADLS accesses. + int RemoteAdlsDiskId() const { return num_local_disks() + REMOTE_ADLS_DISK_OFFSET; } + + /// Dumps the disk IoMgr queues (for readers and disks) + std::string DebugString(); + + /// Validates the internal state is consistent. This is intended to only be used + /// for debugging. + bool Validate() const; + + /// Given a FS handle, name and last modified time of the file, gets an HdfsFileHandle + /// from the file handle cache. If 'require_new_handle' is true, the cache will open + /// a fresh file handle. On success, records statistics about whether this was + /// a cache hit or miss in the 'reader' as well as at the system level. In case of an + /// error returns nullptr. + HdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs, + std::string* fname, int64_t mtime, RequestContext *reader, + bool require_new_handle); + + /// Releases a file handle back to the file handle cache when it is no longer in use. + /// If 'destroy_handle' is true, the file handle cache will close the file handle + /// immediately. + void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid, + bool destroy_handle); + + /// Reopens a file handle by destroying the file handle and getting a fresh + /// file handle from the cache. Returns an error if the file could not be reopened. + Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime, + HdfsFileHandle** fid); + + /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the buffers if + /// 'bytes_to_free' is -1. + void GcIoBuffers(int64_t bytes_to_free = -1); + + /// The maximum number of ready buffers that can be queued in a scan range. Having two + /// queued buffers (plus the buffer that is returned to the client) gives good + /// performance in most scenarios: + /// 1. If the consumer is consuming data faster than we can read from disk, then the + /// queue will be empty most of the time because the buffer will be immediately + /// pulled off the queue as soon as it is added. There will always be an I/O request + /// in the disk queue to maximize I/O throughput, which is the bottleneck in this + /// case. + /// 2. If we can read from disk faster than the consumer is consuming data, the queue + /// will fill up and there will always be a buffer available for the consumer to + /// read, so the consumer will not block and we maximize consumer throughput, which + /// is the bottleneck in this case. + /// 3. If the consumer is consuming data at approximately the same rate as we are + /// reading from disk, then the steady state is that the consumer is processing one + /// buffer and one buffer is in the disk queue. The additional buffer can absorb + /// bursts where the producer runs faster than the consumer or the consumer runs + /// faster than the producer without blocking either the producer or consumer. + static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2; + + /// "Disk" queue offsets for remote accesses. Offset 0 corresponds to + /// disk ID (i.e. disk_queue_ index) of num_local_disks(). + enum { + REMOTE_DFS_DISK_OFFSET = 0, + REMOTE_S3_DISK_OFFSET, + REMOTE_ADLS_DISK_OFFSET, + REMOTE_NUM_DISKS + }; + + private: + friend class BufferDescriptor; + friend class RequestContext; + // TODO: remove io:: prefix - it is required for the "using ScanRange" workaround above. + friend class io::ScanRange; + struct DiskQueue; + + friend class DiskIoMgrTest_Buffers_Test; + friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test; + + /// Memory tracker for unused I/O buffers owned by DiskIoMgr. + boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_; + + /// Memory tracker for I/O buffers where the RequestContext has no MemTracker. + /// TODO: once IMPALA-3200 is fixed, there should be no more cases where readers don't + /// provide a MemTracker. + boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_; + + /// Number of worker(read) threads per rotational disk. Also the max depth of queued + /// work to the disk. + const int num_io_threads_per_rotational_disk_; + + /// Number of worker(read) threads per solid state disk. Also the max depth of queued + /// work to the disk. + const int num_io_threads_per_solid_state_disk_; + + /// Maximum read size. This is also the maximum size of each allocated buffer. + const int max_buffer_size_; + + /// The minimum size of each read buffer. + const int min_buffer_size_; + + /// Thread group containing all the worker threads. + ThreadGroup disk_thread_group_; + + /// Options object for cached hdfs reads. Set on startup and never modified. + struct hadoopRzOptions* cached_read_options_ = nullptr; + + /// True if the IoMgr should be torn down. Worker threads watch for this to + /// know to terminate. This variable is read/written to by different threads. + volatile bool shut_down_; + + /// Total bytes read by the IoMgr. + RuntimeProfile::Counter total_bytes_read_counter_; + + /// Total time spent in hdfs reading + RuntimeProfile::Counter read_timer_; + + /// Protects free_buffers_ + boost::mutex free_buffers_lock_; + + /// Free buffers that can be handed out to clients. There is one list for each buffer + /// size, indexed by the Log2 of the buffer size in units of min_buffer_size_. The + /// maximum buffer size is max_buffer_size_, so the maximum index is + /// Log2(max_buffer_size_ / min_buffer_size_). + // + /// E.g. if min_buffer_size_ = 1024 bytes: + /// free_buffers_[0] => list of free buffers with size 1024 B + /// free_buffers_[1] => list of free buffers with size 2048 B + /// free_buffers_[10] => list of free buffers with size 1 MB + /// free_buffers_[13] => list of free buffers with size 8 MB + /// free_buffers_[n] => list of free buffers with size 2^n * 1024 B + std::vector<std::deque<uint8_t*>> free_buffers_; + + /// Total number of allocated buffers, used for debugging. + AtomicInt32 num_allocated_buffers_; + + /// Total number of buffers in readers + AtomicInt32 num_buffers_in_readers_; + + /// Per disk queues. This is static and created once at Init() time. One queue is + /// allocated for each local disk on the system and for each remote filesystem type. + /// It is indexed by disk id. + std::vector<DiskQueue*> disk_queues_; + + /// The next disk queue to write to if the actual 'disk_id_' is unknown (i.e. the file + /// is not associated with a particular local disk or remote queue). Used to implement + /// round-robin assignment for that case. + static AtomicInt32 next_disk_id_; + + // Number of file handle cache partitions to use + static const size_t NUM_FILE_HANDLE_CACHE_PARTITIONS = 16; + + // Caching structure that maps file names to cached file handles. The cache has an upper + // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached file + // handles are closed. + FileHandleCache<NUM_FILE_HANDLE_CACHE_PARTITIONS> file_handle_cache_; + + /// Returns the index into free_buffers_ for a given buffer size + int free_buffers_idx(int64_t buffer_size); + + /// Returns a buffer to read into with size between 'buffer_size' and + /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the + /// 'free_buffers_', that is returned, otherwise a new one is allocated. + /// The returned *buffer_size must be between 0 and 'max_buffer_size_'. + /// The buffer memory is tracked against reader's mem tracker, or + /// 'unowned_buffer_mem_tracker_' if the reader does not have one. + std::unique_ptr<BufferDescriptor> GetFreeBuffer( + RequestContext* reader, ScanRange* range, int64_t buffer_size); + + /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be nullptr), either + /// freeing it or returning it to 'free_buffers_'. Memory tracking is updated to + /// reflect the transfer of ownership from desc->mem_tracker_ to the disk I/O mgr. + void FreeBufferMemory(BufferDescriptor* desc); + + /// Disk worker thread loop. This function retrieves the next range to process on + /// the disk queue and invokes ReadRange() or Write() depending on the type of Range(). + /// There can be multiple threads per disk running this loop. + void WorkLoop(DiskQueue* queue); + + /// This is called from the disk thread to get the next range to process. It will + /// wait until a scan range and buffer are available, or a write range is available. + /// This functions returns the range to process. + /// Only returns false if the disk thread should be shut down. + /// No locks should be taken before this function call and none are left taken after. + bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, + RequestContext** request_context); + + /// Updates disk queue and reader state after a read is complete. The read result + /// is captured in the buffer descriptor. + void HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader, + std::unique_ptr<BufferDescriptor> buffer); + + /// Invokes write_range->callback_ after the range has been written and + /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR + /// etc. is passed via write_status and to the callback. + /// The write_status does not affect the writer->status_. That is, an write error does + /// not cancel the writer context - that decision is left to the callback handler. + /// TODO: On the read path, consider not canceling the reader context on error. + void HandleWriteFinished( + RequestContext* writer, WriteRange* write_range, const Status& write_status); + + /// Validates that range is correctly initialized + Status ValidateScanRange(ScanRange* range) WARN_UNUSED_RESULT; + + /// Write the specified range to disk and calls HandleWriteFinished when done. + /// Responsible for opening and closing the file that is written. + void Write(RequestContext* writer_context, WriteRange* write_range); + + /// Helper method to write a range using the specified FILE handle. Returns Status:OK + /// if the write succeeded, or a RUNTIME_ERROR with an appropriate message otherwise. + /// Does not open or close the file that is written. + Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) WARN_UNUSED_RESULT; + + /// Reads the specified scan range and calls HandleReadFinished when done. + void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* range); + + /// Try to allocate the next buffer for the scan range, returning the new buffer + /// if successful. If 'reader' is cancelled, cancels the range and returns nullptr. + /// If there is memory pressure and buffers are already queued, adds the range + /// to the blocked ranges and returns nullptr. + std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* disk_queue, + RequestContext* reader, ScanRange* range, int64_t buffer_size); +}; +} +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/io/handle-cache.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/handle-cache.h b/be/src/runtime/io/handle-cache.h new file mode 100644 index 0000000..78f91cd --- /dev/null +++ b/be/src/runtime/io/handle-cache.h @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H +#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H + +#include <array> +#include <list> +#include <map> +#include <memory> + +#include <boost/thread/mutex.hpp> + +#include "common/hdfs.h" +#include "common/status.h" +#include "util/aligned-new.h" +#include "util/impalad-metrics.h" +#include "util/spinlock.h" +#include "util/thread.h" + +namespace impala { +namespace io { + +/// This class is a small wrapper around the hdfsFile handle and the file system +/// instance which is needed to close the file handle. The handle incorporates +/// the last modified time of the file when it was opened. This is used to distinguish +/// between file handles for files that can be updated or overwritten. +class HdfsFileHandle { + public: + + /// Constructor will open the file + HdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime); + + /// Destructor will close the file handle + ~HdfsFileHandle(); + + hdfsFile file() const { return hdfs_file_; } + int64_t mtime() const { return mtime_; } + bool ok() const { return hdfs_file_ != nullptr; } + + private: + hdfsFS fs_; + hdfsFile hdfs_file_; + int64_t mtime_; +}; + +/// The FileHandleCache is a data structure that owns HdfsFileHandles to share between +/// threads. The HdfsFileHandles are hash partitioned across NUM_PARTITIONS partitions. +/// Each partition operates independently with its own locks, reducing contention +/// between concurrent threads. The `capacity` is split between the partitions and is +/// enforced independently. +/// +/// Threads check out a file handle for exclusive access and return it when finished. +/// If the file handle is not already present in the cache or all file handles for this +/// file are checked out, the file handle is constructed and added to the cache. +/// The cache can contain multiple file handles for the same file. If a file handle +/// is checked out, it cannot be evicted from the cache. In this case, a cache can +/// exceed the specified capacity. +/// +/// The file handle cache is currently not suitable for remote files that maintain a +/// connection as part of the handle. Most remote systems have a limit on the number +/// of concurrent connections, and file handles in the cache would be counted towards +/// that limit. +/// +/// If there is a file handle in the cache and the underlying file is deleted, +/// the file handle might keep the file from being deleted at the OS level. This can +/// take up disk space and impact correctness. To avoid this, the cache will evict any +/// file handle that has been unused for longer than threshold specified by +/// `unused_handle_timeout_secs`. Eviction is disabled when the threshold is 0. +/// +/// TODO: The cache should also evict file handles more aggressively if the file handle's +/// mtime is older than the file's current mtime. +template <size_t NUM_PARTITIONS> +class FileHandleCache { + public: + /// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS + /// partitions. If the capacity does not split evenly, then the capacity is rounded + /// up. The cache will age out any file handle that is unused for + /// `unused_handle_timeout_secs` seconds. Age out is disabled if this is set to zero. + FileHandleCache(size_t capacity, uint64_t unused_handle_timeout_secs); + + /// Destructor is only called for backend tests + ~FileHandleCache(); + + /// Starts up a thread that monitors the age of file handles and evicts any that + /// exceed the limit. + Status Init() WARN_UNUSED_RESULT; + + /// Get a file handle from the cache for the specified filename (fname) and + /// last modification time (mtime). This will hash the filename to determine + /// which partition to use for this file handle. + /// + /// If 'require_new_handle' is false and the partition contains an available handle, + /// the handle is returned and cache_hit is set to true. Otherwise, the partition will + /// try to construct a file handle and add it to the partition. On success, the new + /// file handle will be returned with cache_hit set to false. On failure, nullptr will + /// be returned. In either case, the partition may evict a file handle to make room + /// for the new file handle. + /// + /// This obtains exclusive control over the returned file handle. It must be paired + /// with a call to ReleaseFileHandle to release exclusive control. + HdfsFileHandle* GetFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime, + bool require_new_handle, bool* cache_hit); + + /// Release the exclusive hold on the specified file handle (which was obtained + /// by calling GetFileHandle). The cache may evict a file handle if the cache is + /// above capacity. If 'destroy_handle' is true, immediately remove this handle + /// from the cache. + void ReleaseFileHandle(std::string* fname, HdfsFileHandle* fh, bool destroy_handle); + + private: + struct FileHandleEntry; + typedef std::multimap<std::string, FileHandleEntry> MapType; + + struct LruListEntry { + LruListEntry(typename MapType::iterator map_entry_in); + typename MapType::iterator map_entry; + uint64_t timestamp_seconds; + }; + typedef std::list<LruListEntry> LruListType; + + struct FileHandleEntry { + FileHandleEntry(HdfsFileHandle* fh_in, LruListType& lru_list) + : fh(fh_in), lru_entry(lru_list.end()) {} + std::unique_ptr<HdfsFileHandle> fh; + + /// in_use is true for a file handle checked out via GetFileHandle() that has not + /// been returned via ReleaseFileHandle(). + bool in_use = false; + + /// Iterator to this element's location in the LRU list. This only points to a + /// valid location when in_use is true. For error-checking, this is set to + /// lru_list.end() when in_use is false. + typename LruListType::iterator lru_entry; + }; + + /// Each partition operates independently, and thus has its own cache, LRU list, + /// and corresponding lock. To avoid contention on the lock_ due to false sharing + /// the partitions are aligned to cache line boundaries. + struct FileHandleCachePartition : public CacheLineAligned { + /// Protects access to cache and lru_list. + SpinLock lock; + + /// Multimap from the file name to the file handles for that file. The cache + /// can contain multiple file handles for the same file and some may have + /// different mtimes if the file is being modified. All file handles are always + /// owned by the cache. + MapType cache; + + /// The LRU list only contains file handles that are not in use. + LruListType lru_list; + + /// Maximum number of file handles in cache without evicting unused file handles. + /// It is not a strict limit, and can be exceeded if all file handles are in use. + size_t capacity; + + /// Current number of file handles in the cache + size_t size; + }; + + /// Periodic check to evict unused file handles. Only executed by eviction_thread_. + void EvictHandlesLoop(); + static const int64_t EVICT_HANDLES_PERIOD_MS = 1000; + + /// If the partition is above its capacity, evict the oldest unused file handles to + /// enforce the capacity. + void EvictHandles(FileHandleCachePartition& p); + + std::array<FileHandleCachePartition, NUM_PARTITIONS> cache_partitions_; + + /// Maximum time before an unused file handle is aged out of the cache. + /// Aging out is disabled if this is set to 0. + uint64_t unused_handle_timeout_secs_; + + /// Thread to check for unused file handles to evict. This thread will exit when + /// the shut_down_promise_ is set. + std::unique_ptr<Thread> eviction_thread_; + Promise<bool> shut_down_promise_; +}; +} +} + +#endif