IMPALA-6587: free buffers before ScanRange::Cancel() returns ScanRange::Cancel() now waits until an in-flight read finishes so that the disk I/O buffer being processed by the disk thread is freed when Cancel() returns.
The fix is to set a 'read_in_flight_' flag on the scan range while the disk thread is doing the read. Cancel() blocks until read_in_flight_ == false. The code is refactored to move more logic into ScanRange and to avoid holding RequestContext::lock_ for longer than necessary. Testing: Added query test that reproduces the issue. Added a unit test and a stress option that reproduces the problem in a targeted way. Ran disk-io-mgr-stress test for a few hours. Ran it under TSAN and inspected output to make sure there were no non-benign data races. Change-Id: I87182b6bd51b5fb0b923e7e4c8d08a44e7617db2 Reviewed-on: http://gerrit.cloudera.org:8080/9680 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Tim Armstrong <[email protected]> Reviewed-on: http://gerrit.cloudera.org:8080/10271 Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/82c43f4f Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/82c43f4f Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/82c43f4f Branch: refs/heads/2.x Commit: 82c43f4f151b56dbae3c74c086a8c401fc5d14bf Parents: 9bf324e Author: Tim Armstrong <[email protected]> Authored: Wed Feb 28 09:49:25 2018 -0800 Committer: Tim Armstrong <[email protected]> Committed: Thu May 3 15:28:03 2018 +0000 ---------------------------------------------------------------------- be/src/common/global-flags.cc | 2 + be/src/runtime/io/disk-io-mgr-internal.h | 9 +- be/src/runtime/io/disk-io-mgr-stress.cc | 2 + be/src/runtime/io/disk-io-mgr-test.cc | 49 +++++++ be/src/runtime/io/disk-io-mgr.cc | 100 +------------ be/src/runtime/io/disk-io-mgr.h | 19 +-- be/src/runtime/io/request-context.cc | 32 ++++- be/src/runtime/io/request-context.h | 16 ++- be/src/runtime/io/request-ranges.h | 79 ++++++---- be/src/runtime/io/scan-range.cc | 143 +++++++++++++++---- .../queries/QueryTest/scanners.test | 11 ++ 11 files changed, 288 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/be/src/common/global-flags.cc ---------------------------------------------------------------------- diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc index 11d8db8..8ea1c90 100644 --- a/be/src/common/global-flags.cc +++ b/be/src/common/global-flags.cc @@ -148,6 +148,8 @@ DEFINE_bool(thread_creation_fault_injection, false, "A fault injection option th DEFINE_int32(stress_catalog_init_delay_ms, 0, "A stress option that injects extra delay" " in milliseconds when initializing an impalad's local catalog replica. Delay <= 0" " inject no delay."); +DEFINE_int32(stress_disk_read_delay_ms, 0, "A stress option that injects extra delay" + " in milliseconds when the I/O manager is reading from disk."); #endif // Used for testing the path where the Kudu client is stubbed. http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/be/src/runtime/io/disk-io-mgr-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h index 292530f..475456a 100644 --- a/be/src/runtime/io/disk-io-mgr-internal.h +++ b/be/src/runtime/io/disk-io-mgr-internal.h @@ -39,6 +39,8 @@ /// This file contains internal structures shared between submodules of the IoMgr. Users /// of the IoMgr do not need to include this file. +DECLARE_uint64(max_cached_file_handles); + // Macros to work around counters sometimes not being provided. // TODO: fix things so that counters are always non-NULL. #define COUNTER_ADD_IF_NOT_NULL(c, v) \ @@ -56,8 +58,13 @@ namespace impala { namespace io { +// Indicates if file handle caching should be used +static inline bool is_file_handle_caching_enabled() { + return FLAGS_max_cached_file_handles > 0; +} + /// Per disk state -struct DiskIoMgr::DiskQueue { +struct DiskQueue { /// Disk id (0-based) int disk_id; http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/be/src/runtime/io/disk-io-mgr-stress.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc index 3fd33de..fd360c4 100644 --- a/be/src/runtime/io/disk-io-mgr-stress.cc +++ b/be/src/runtime/io/disk-io-mgr-stress.cc @@ -247,6 +247,8 @@ void DiskIoMgrStress::NewClient(int i) { // Clean up leftover state from the previous client (if any). client.scan_ranges.clear(); ExecEnv* exec_env = ExecEnv::GetInstance(); + if (client.reader != nullptr) io_mgr_->UnregisterContext(client.reader.get()); + exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]); if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close(); client.obj_pool.Clear(); http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/be/src/runtime/io/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc index 95b2d03..3e8a9ae 100644 --- a/be/src/runtime/io/disk-io-mgr-test.cc +++ b/be/src/runtime/io/disk-io-mgr-test.cc @@ -33,10 +33,12 @@ #include "service/fe-support.h" #include "testutil/gtest-util.h" #include "testutil/rand-util.h" +#include "testutil/scoped-flag-setter.h" #include "util/condition-variable.h" #include "util/cpu-info.h" #include "util/disk-info.h" #include "util/thread.h" +#include "util/time.h" #include "common/names.h" @@ -48,6 +50,9 @@ DECLARE_int64(min_buffer_size); DECLARE_int32(num_remote_hdfs_io_threads); DECLARE_int32(num_s3_io_threads); DECLARE_int32(num_adls_io_threads); +#ifndef NDEBUG +DECLARE_int32(stress_disk_read_delay_ms); +#endif const int MIN_BUFFER_SIZE = 128; const int MAX_BUFFER_SIZE = 1024; @@ -1260,6 +1265,50 @@ TEST_F(DiskIoMgrTest, SkipAllocateBuffers) { io_mgr.UnregisterContext(reader.get()); } +// Regression test for IMPALA-6587 - all buffers should be released after Cancel(). +TEST_F(DiskIoMgrTest, CancelReleasesResources) { + InitRootReservation(LARGE_RESERVATION_LIMIT); + const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; + const char* data = "the quick brown fox jumped over the lazy dog"; + int len = strlen(data); + const int64_t MIN_BUFFER_SIZE = 2; + const int64_t MAX_BUFFER_SIZE = 1024; + CreateTempFile(tmp_file, data); + + // Get mtime for file + struct stat stat_val; + stat(tmp_file, &stat_val); + + const int NUM_DISK_THREADS = 20; + DiskIoMgr io_mgr( + 1, NUM_DISK_THREADS, NUM_DISK_THREADS, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); +#ifndef NDEBUG + auto s = ScopedFlagSetter<int32_t>::Make(&FLAGS_stress_disk_read_delay_ms, 5); +#endif + + ASSERT_OK(io_mgr.Init()); + unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); + + for (int i = 0; i < 10; ++i) { + ScanRange* range = InitRange(&pool_, tmp_file, 0, len, 0, stat_val.st_mtime); + bool needs_buffers; + ASSERT_OK(io_mgr.StartScanRange(reader.get(), range, &needs_buffers)); + EXPECT_TRUE(needs_buffers); + ASSERT_OK(io_mgr.AllocateBuffersForRange(reader.get(), &read_client, range, MAX_BUFFER_SIZE)); + // Give disk I/O thread a chance to start read. + SleepForMs(1); + + range->Cancel(Status::CANCELLED); + // Resources should be released immediately once Cancel() returns. + EXPECT_EQ(0, read_client.GetUsedReservation()) << " iter " << i; + } + buffer_pool()->DeregisterClient(&read_client); + io_mgr.UnregisterContext(reader.get()); +} + // Test reading into a client-allocated buffer. TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { InitRootReservation(LARGE_RESERVATION_LIMIT); http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/be/src/runtime/io/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc index 8933fec..50f6fc4 100644 --- a/be/src/runtime/io/disk-io-mgr.cc +++ b/be/src/runtime/io/disk-io-mgr.cc @@ -129,13 +129,6 @@ DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used b AtomicInt32 DiskIoMgr::next_disk_id_; -namespace detail { -// Indicates if file handle caching should be used -static inline bool is_file_handle_caching_enabled() { - return FLAGS_max_cached_file_handles > 0; -} -} - string DiskIoMgr::DebugString() { stringstream ss; ss << "Disks: " << endl; @@ -549,7 +542,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, disk_queue->request_contexts.pop_front(); DCHECK(*request_context != nullptr); request_disk_state = &((*request_context)->disk_states_[disk_id]); - request_disk_state->IncrementDiskThreadAndDequeue(); + request_disk_state->IncrementDiskThreadAfterDequeue(); } // NOTE: no locks were taken in between. We need to be careful about what state @@ -643,50 +636,6 @@ void DiskIoMgr::HandleWriteFinished( } } -void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader, - Status read_status, unique_ptr<BufferDescriptor> buffer) { - unique_lock<mutex> reader_lock(reader->lock_); - - RequestContext::PerDiskState* disk_state = &reader->disk_states_[disk_queue->disk_id]; - DCHECK(reader->Validate()) << endl << reader->DebugString(); - DCHECK_GT(disk_state->num_threads_in_op(), 0); - DCHECK(buffer->buffer_ != nullptr); - DCHECK(!buffer->is_cached()) << "HDFS cache reads don't go through this code path."; - - // After calling EnqueueReadyBuffer() below, it is no longer valid to read from buffer. - // Store the state we need before calling EnqueueReadyBuffer(). - bool eosr = buffer->eosr_; - - // TODO: IMPALA-4249: it safe to touch 'scan_range' until DecrementDiskThread() is - // called because all clients of DiskIoMgr keep ScanRange objects alive until they - // unregister their RequestContext. - ScanRange* scan_range = buffer->scan_range_; - bool scan_range_done = eosr; - if (read_status.ok() && reader->state_ != RequestContext::Cancelled) { - DCHECK_EQ(reader->state_, RequestContext::Active); - // Read successfully - update the reader's scan ranges. There are two cases here: - // 1. End of scan range or cancelled scan range - don't need to reschedule. - // 2. Middle of scan range - need to schedule to read next buffer. - bool enqueued = scan_range->EnqueueReadyBuffer(reader_lock, move(buffer)); - if (!eosr && enqueued) reader->ScheduleScanRange(reader_lock, scan_range); - } else { - // The scan range will be cancelled, either because we hit an error or because the - // request context was cancelled. The buffer is not needed - we must free it. - reader->FreeBuffer(buffer.get()); - // Propagate 'read_status' to the scan range. If we are here because the context - // was cancelled, the scan range is already cancelled so we do not need to re-cancel - // it. - if (!read_status.ok()) scan_range->CancelFromReader(reader_lock, read_status); - scan_range_done = true; - } - if (scan_range_done) { - scan_range->Close(); - --disk_state->num_remaining_ranges(); - } - DCHECK(reader->Validate()) << endl << reader->DebugString(); - disk_state->DecrementDiskThread(reader_lock, reader); -} - void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) { // The thread waits until there is work or the entire system is being shut down. // If there is work, performs the read or write requested and re-enqueues the @@ -707,7 +656,9 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) { return; } if (range->request_type() == RequestType::READ) { - ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range)); + ScanRange* scan_range = static_cast<ScanRange*>(range); + ReadOutcome outcome = scan_range->DoRead(disk_queue->disk_id); + worker_context->ReadDone(disk_queue->disk_id, outcome, scan_range); } else { DCHECK(range->request_type() == RequestType::WRITE); Write(worker_context, static_cast<WriteRange*>(range)); @@ -715,49 +666,6 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) { } } -void DiskIoMgr::ReadRange( - DiskQueue* disk_queue, RequestContext* reader, ScanRange* range) { - int64_t bytes_remaining = range->len_ - range->bytes_read_; - DCHECK_GT(bytes_remaining, 0); - unique_ptr<BufferDescriptor> buffer_desc; - if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) { - buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, reader, range, - range->client_buffer_.data, range->client_buffer_.len)); - } else { - DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER) - << "This code path does not handle other buffer types, i.e. HDFS cache" - << static_cast<int>(range->external_buffer_tag_); - buffer_desc = range->GetNextUnusedBufferForRange(); - if (buffer_desc == nullptr) { - // No buffer available - the range will be rescheduled when a buffer is added. - unique_lock<mutex> reader_lock(reader->lock_); - reader->disk_states_[disk_queue->disk_id].DecrementDiskThread(reader_lock, reader); - DCHECK(reader->Validate()) << endl << reader->DebugString(); - return; - } - } - - // No locks in this section. Only working on local vars. We don't want to hold a - // lock across the read call. - Status read_status = range->Open(detail::is_file_handle_caching_enabled()); - if (read_status.ok()) { - // Update counters. - COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, 1L); - COUNTER_BITOR_IF_NOT_NULL(reader->disks_accessed_bitmap_, 1LL << disk_queue->disk_id); - - read_status = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_, - &buffer_desc->len_, &buffer_desc->eosr_); - buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_; - - COUNTER_ADD_IF_NOT_NULL(reader->bytes_read_counter_, buffer_desc->len_); - COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_); - COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, -1L); - } - - // Finished read, update reader/disk based on the results - HandleReadFinished(disk_queue, reader, read_status, move(buffer_desc)); -} - void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) { Status ret_status = Status::OK(); FILE* file_handle = nullptr; http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/be/src/runtime/io/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h index 17aa211..b6b4b75 100644 --- a/be/src/runtime/io/disk-io-mgr.h +++ b/be/src/runtime/io/disk-io-mgr.h @@ -44,6 +44,8 @@ namespace impala { namespace io { + +struct DiskQueue; /// Manager object that schedules IO for all queries on all disks and remote filesystems /// (such as S3). Each query maps to one or more RequestContext objects, each of which /// has its own queue of scan ranges and/or write ranges. @@ -393,7 +395,6 @@ class DiskIoMgr : public CacheLineAligned { friend class RequestContext; // TODO: remove io:: prefix - it is required for the "using ScanRange" workaround above. friend class io::ScanRange; - struct DiskQueue; friend class DiskIoMgrTest_Buffers_Test; friend class DiskIoMgrTest_BufferSizeSelection_Test; @@ -448,8 +449,8 @@ class DiskIoMgr : public CacheLineAligned { FileHandleCache file_handle_cache_; /// Disk worker thread loop. This function retrieves the next range to process on - /// the disk queue and invokes ReadRange() or Write() depending on the type of Range(). - /// There can be multiple threads per disk running this loop. + /// the disk queue and invokes ScanRange::DoRead() or Write() depending on the type + /// of Range. There can be multiple threads per disk running this loop. void WorkLoop(DiskQueue* queue); /// This is called from the disk thread to get the next range to process. It will @@ -460,13 +461,6 @@ class DiskIoMgr : public CacheLineAligned { bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, RequestContext** request_context); - /// Updates disk queue and reader state after a read is complete. If the read - /// was successful, 'read_status' is ok and 'buffer' contains the result of the - /// read. If the read failed with an error, 'read_status' contains the error and - /// 'buffer' has the buffer that was meant to hold the result of the read. - void HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader, - Status read_status, std::unique_ptr<BufferDescriptor> buffer); - /// Invokes write_range->callback_ after the range has been written and /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR /// etc. is passed via write_status and to the callback. @@ -488,11 +482,6 @@ class DiskIoMgr : public CacheLineAligned { /// Does not open or close the file that is written. Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) WARN_UNUSED_RESULT; - /// Reads the specified scan range and calls HandleReadFinished() when done. If no - /// buffer is available to read the range's data into, the read cannot proceed, the - /// range becomes blocked and this function returns without doing I/O. - void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* range); - /// Helper for AllocateBuffersForRange() to compute the buffer sizes for a scan range /// with length 'scan_range_len', given that 'max_bytes' of memory should be allocated. std::vector<int64_t> ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes); http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/be/src/runtime/io/request-context.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc index dec6aa6..dd8ff24 100644 --- a/be/src/runtime/io/request-context.cc +++ b/be/src/runtime/io/request-context.cc @@ -64,6 +64,34 @@ void RequestContext::FreeBuffer(BufferDescriptor* buffer) { buffer->buffer_ = nullptr; } +void RequestContext::ReadDone(int disk_id, ReadOutcome outcome, ScanRange* range) { + // TODO: IMPALA-4249: it is safe to touch 'range' until DecrementDiskThread() is + // called because all clients of DiskIoMgr keep ScanRange objects alive until they + // unregister their RequestContext. + unique_lock<mutex> lock(lock_); + RequestContext::PerDiskState* disk_state = &disk_states_[disk_id]; + DCHECK_GT(disk_state->num_threads_in_op(), 0); + if (outcome == ReadOutcome::SUCCESS_EOSR) { + // No more reads to do. + --disk_state->num_remaining_ranges(); + } else if (outcome == ReadOutcome::SUCCESS_NO_EOSR) { + // Schedule the next read. + if (state_ != RequestContext::Cancelled) { + ScheduleScanRange(lock, range); + } + } else if (outcome == ReadOutcome::BLOCKED_ON_BUFFER) { + // Do nothing - the caller must add a buffer to the range or cancel it. + } else { + DCHECK(outcome == ReadOutcome::CANCELLED) << static_cast<int>(outcome); + // No more reads - clean up the scan range. + --disk_state->num_remaining_ranges(); + RemoveActiveScanRangeLocked(lock, range); + } + // Release refcount that was taken in IncrementDiskThreadAfterDequeue(). + disk_state->DecrementDiskThread(lock, this); + DCHECK(Validate()) << endl << DebugString(); +} + // Cancellation of a RequestContext requires coordination from multiple threads that may // hold references to the context: // 1. Disk threads that are currently processing a range for this context. @@ -104,7 +132,9 @@ void RequestContext::Cancel() { // Clear out all request ranges from queues for this reader. Cancel the scan // ranges and invoke the write range callbacks to propagate the cancellation. - for (ScanRange* range : active_scan_ranges_) range->CancelInternal(Status::CANCELLED); + for (ScanRange* range : active_scan_ranges_) { + range->CancelInternal(Status::CANCELLED, false); + } active_scan_ranges_.clear(); for (PerDiskState& disk_state : disk_states_) { RequestRange* range; http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/be/src/runtime/io/request-context.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h index b028596..adfe6d5 100644 --- a/be/src/runtime/io/request-context.h +++ b/be/src/runtime/io/request-context.h @@ -201,6 +201,11 @@ class RequestContext { state.ScheduleContext(lock, this, range->disk_id()); } + /// Called from a disk thread when a read completes. Decrements the disk thread count + /// and other bookkeeping and re-schedules 'range' if there are more reads to do. + /// Caller must not hold 'lock_'. + void ReadDone(int disk_id, ReadOutcome outcome, ScanRange* range); + /// Cancel the context if not already cancelled, wait for all scan ranges to finish /// and mark the context as inactive, after which it cannot be used. void CancelAndMarkInactive(); @@ -367,13 +372,14 @@ class RequestContext { void ScheduleContext(const boost::unique_lock<boost::mutex>& context_lock, RequestContext* context, int disk_id); - /// Increment the count of disk threads that have a reference to this context. These - /// threads do not hold any locks while reading from HDFS, so we need to prevent the - /// RequestContext from being destroyed underneath them. + /// Called when dequeueing this RequestContext from the disk queue to increment the + /// count of disk threads with a reference to this context. These threads do not hold + /// any locks while reading from HDFS, so we need to prevent the RequestContext from + /// being destroyed underneath them. /// /// The caller does not need to hold 'lock_', so this can execute concurrently with /// itself and DecrementDiskThread(). - void IncrementDiskThreadAndDequeue() { + void IncrementDiskThreadAfterDequeue() { /// Incrementing 'num_threads_in_op_' first so that there is no window when other /// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no /// references left to this context. @@ -395,7 +401,7 @@ class RequestContext { } // The state is cancelled, check to see if we're the last thread to touch the // context on this disk. We need to load 'is_on_queue_' and 'num_threads_in_op_' - // in this order to avoid a race with IncrementDiskThreadAndDequeue(). + // in this order to avoid a race with IncrementDiskThreadAfterDequeue(). if (is_on_queue_.Load() == 0 && num_threads_in_op_.Load() == 0 && !done_) { context->DecrementDiskRefCount(context_lock); done_ = true; http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/be/src/runtime/io/request-ranges.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h index 7ec0cb6..3e266d6 100644 --- a/be/src/runtime/io/request-ranges.h +++ b/be/src/runtime/io/request-ranges.h @@ -34,8 +34,9 @@ namespace impala { namespace io { class DiskIoMgr; -class RequestContext; +struct DiskQueue; class ExclusiveHdfsFileHandle; +class RequestContext; class ScanRange; /// Buffer struct that is used by the caller and IoMgr to pass read buffers. @@ -114,6 +115,20 @@ struct RequestType { }; }; +/// ReadOutput describes the possible outcomes of the DoRead() function. +enum class ReadOutcome { + // The last (eosr) buffer was successfully enqueued. + SUCCESS_EOSR, + // The buffer was successfully enqueued but we are not at eosr and can schedule + // the next read. + SUCCESS_NO_EOSR, + // The scan range is blocked waiting for the next buffer. + BLOCKED_ON_BUFFER, + // The scan range is cancelled (either by caller or because of an error). No more + // reads will be scheduled. + CANCELLED +}; + /// Represents a contiguous sequence of bytes in a single file. /// This is the common base class for read and write IO requests - ScanRange and /// WriteRange. Each disk thread processes exactly one RequestRange at a time. @@ -236,12 +251,12 @@ class ScanRange : public RequestRange { /// and cannot be accessed. void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer); - /// Cancel this scan range. This cleans up all queued buffers and wakes up any threads - /// blocked on GetNext(). Status is a non-ok status with the reason the range was - /// cancelled, e.g. CANCELLED if the range was cancelled because it was not needed, or - /// another error if an error was encountered while scanning the range. Status is - /// returned to the any callers of GetNext(). If a thread is currently blocked in - /// GetNext(), it is woken up. + /// Cancel this scan range. This waits for any in-flight read operations to complete, + /// cleans up all buffers owned by the scan range (i.e. queued or unused buffers) + /// and wakes up any threads blocked on GetNext(). Status is a non-ok status with the + /// reason the range was cancelled, e.g. CANCELLED if the range was cancelled because + /// it was not needed, or another error if an error was encountered while scanning the + /// range. Status is returned to the any callers of GetNext(). void Cancel(const Status& status); /// return a descriptive string for debug. @@ -259,10 +274,9 @@ class ScanRange : public RequestRange { /// Enqueues a ready buffer with valid data for this range. This does not block. /// The caller passes ownership of buffer to the scan range and it is not - /// valid to access buffer after this call. The reader lock must be held by the - /// caller. Returns false if the scan range was cancelled. - bool EnqueueReadyBuffer(const boost::unique_lock<boost::mutex>& reader_lock, - std::unique_ptr<BufferDescriptor> buffer); + /// valid to access buffer after this call. Returns false if the scan range was + /// cancelled. + bool EnqueueReadyBuffer(std::unique_ptr<BufferDescriptor> buffer); /// Validates the internal state of this range. lock_ must be taken /// before calling this. @@ -324,10 +338,10 @@ class ScanRange : public RequestRange { std::unique_ptr<BufferDescriptor> GetUnusedBuffer( const boost::unique_lock<boost::mutex>& scan_range_lock); - /// Get the next buffer for this scan range for a disk thread to read into. Returns - /// the new buffer if successful. If no buffers are available, marks the range - /// as blocked and returns nullptr. Called must not hold 'lock_'. - std::unique_ptr<BufferDescriptor> GetNextUnusedBufferForRange(); + /// Called from a disk I/O thread to read the next buffer of data for this range. The + /// returned ReadOutcome describes what the result of the read was. 'disk_id' is the + /// ID of the disk queue. Caller must not hold 'lock_'. + ReadOutcome DoRead(int disk_id); /// Cleans up a buffer that was not returned to the client. /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor. @@ -344,15 +358,18 @@ class ScanRange : public RequestRange { /// range is cancelled or at eos. The caller must hold 'lock_' via 'scan_range_lock'. void CleanUpUnusedBuffers(const boost::unique_lock<boost::mutex>& scan_range_lock); - /// Same as Cancel() except reader_->lock must be held by the caller. - void CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock, - const Status& status); - /// Same as Cancel() except doesn't remove the scan range from - /// reader_->active_scan_ranges_. This is invoked by RequestContext::Cancel(), - /// which removes the range itself to avoid invalidating its active_scan_ranges_ - /// iterator. - void CancelInternal(const Status& status); + /// reader_->active_scan_ranges_ or wait for in-flight reads to finish. + /// This is invoked by RequestContext::Cancel(), which removes the range itself + /// to avoid invalidating its active_scan_ranges_ iterator. If 'read_error' is + /// true, this is being called from a disk thread to propagate a read error, so + /// 'read_in_flight_' is set to false and threads in WaitForInFlightRead() are + /// woken up. + void CancelInternal(const Status& status, bool read_error); + + /// Waits for any in-flight read to complete. Called after CancelInternal() to ensure + /// no more reads will occur for the scan range. + void WaitForInFlightRead(); /// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'. void SetBlockedOnBuffer(); @@ -451,10 +468,15 @@ class ScanRange : public RequestRange { /// Total number of bytes of buffers in 'unused_iomgr_buffers_'. int64_t unused_iomgr_buffer_bytes_ = 0; - /// Number of bytes of buffers returned from GetNextUnusedBufferForRange(). Used to - /// infer how many bytes of buffers need to be held onto to read the rest of the scan - /// range. - int64_t iomgr_buffer_bytes_returned_ = 0; + /// Cumulative bytes of I/O mgr buffers taken from 'unused_iomgr_buffers_' by DoRead(). + /// Used to infer how many bytes of buffers need to be held onto to read the rest of + /// the scan range. + int64_t iomgr_buffer_cumulative_bytes_used_ = 0; + + /// True if a disk thread is currently doing a read for this scan range. Set to true in + /// DoRead() and set to false in EnqueueReadyBuffer() or CancelInternal() when the + /// read completes and any buffer used for the read is either enqueued or freed. + bool read_in_flight_ = false; /// If true, the last buffer for this scan range has been queued. /// If this is true and 'ready_buffers_' is empty, then no more buffers will be @@ -472,7 +494,8 @@ class ScanRange : public RequestRange { /// 'cancel_status_' is not OK. std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_; - /// Condition variable for threads in GetNext() that are waiting for the next buffer. + /// Condition variable for threads in GetNext() that are waiting for the next buffer + /// and threads in WaitForInFlightRead() that are waiting for a read to finish. /// Signalled when a buffer is enqueued in 'ready_buffers_' or the scan range is /// cancelled. ConditionVariable buffer_ready_cv_; http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/be/src/runtime/io/scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc index 4f7c38b..c868c3d 100644 --- a/be/src/runtime/io/scan-range.cc +++ b/be/src/runtime/io/scan-range.cc @@ -36,23 +36,33 @@ DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRea DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when " "reading from ADLS."); +#ifndef NDEBUG +DECLARE_int32(stress_disk_read_delay_ms); +#endif + // Implementation of the ScanRange functionality. Each ScanRange contains a queue // of ready buffers. For each ScanRange, there is only a single producer and // consumer thread, i.e. only one disk thread will push to a scan range at // any time and only one thread will remove from the queue. This is to guarantee // that buffers are queued and read in file order. - -bool ScanRange::EnqueueReadyBuffer( - const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) { - DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); +bool ScanRange::EnqueueReadyBuffer(unique_ptr<BufferDescriptor> buffer) { DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer"; { unique_lock<mutex> scan_range_lock(lock_); DCHECK(Validate()) << DebugString(); DCHECK(!eosr_queued_); + if (!buffer->is_cached()) { + // All non-cached buffers are enqueued by disk threads. Indicate that the read + // finished. + DCHECK(read_in_flight_); + read_in_flight_ = false; + } if (!cancel_status_.ok()) { // This range has been cancelled, no need to enqueue the buffer. CleanUpBuffer(scan_range_lock, move(buffer)); + // One or more threads may be blocked in WaitForInFlightRead() waiting for the read + // to complete. Wake up all of them. + buffer_ready_cv_.NotifyAll(); return false; } // Clean up any surplus buffers. E.g. we may have allocated too many if the file was @@ -122,7 +132,7 @@ void ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers, if (external_buffer_tag_ != ExternalBufferTag::NO_BUFFER || !cancel_status_.ok() || eosr_queued_ - || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_bytes_returned_) { + || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_cumulative_bytes_used_) { CleanUpBuffer(scan_range_lock, move(buffer)); } else { unused_iomgr_buffer_bytes_ += buffer->buffer_len(); @@ -156,15 +166,77 @@ unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer( return result; } -unique_ptr<BufferDescriptor> ScanRange::GetNextUnusedBufferForRange() { - unique_lock<mutex> lock(lock_); - unique_ptr<BufferDescriptor> buffer_desc = GetUnusedBuffer(lock); - if (buffer_desc == nullptr) { - blocked_on_buffer_ = true; - } else { - iomgr_buffer_bytes_returned_ += buffer_desc->buffer_len(); +ReadOutcome ScanRange::DoRead(int disk_id) { + int64_t bytes_remaining = len_ - bytes_read_; + DCHECK_GT(bytes_remaining, 0); + + unique_ptr<BufferDescriptor> buffer_desc; + { + unique_lock<mutex> lock(lock_); + DCHECK(!read_in_flight_); + if (!cancel_status_.ok()) return ReadOutcome::CANCELLED; + + if (external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) { + buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor( + io_mgr_, reader_, this, client_buffer_.data, client_buffer_.len)); + } else { + DCHECK(external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER) + << "This code path does not handle other buffer types, i.e. HDFS cache" + << static_cast<int>(external_buffer_tag_); + buffer_desc = GetUnusedBuffer(lock); + if (buffer_desc == nullptr) { + // No buffer available - the range will be rescheduled when a buffer is added. + blocked_on_buffer_ = true; + return ReadOutcome::BLOCKED_ON_BUFFER; + } + iomgr_buffer_cumulative_bytes_used_ += buffer_desc->buffer_len(); + } + read_in_flight_ = true; } - return buffer_desc; + + // No locks in this section. Only working on local vars. We don't want to hold a + // lock across the read call. + Status read_status = Open(is_file_handle_caching_enabled()); + if (read_status.ok()) { + COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L); + COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id); + + read_status = Read(buffer_desc->buffer_, buffer_desc->buffer_len_, + &buffer_desc->len_, &buffer_desc->eosr_); + buffer_desc->scan_range_offset_ = bytes_read_ - buffer_desc->len_; + + COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_); + COUNTER_ADD(&io_mgr_->total_bytes_read_counter_, buffer_desc->len_); + COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L); + } + + DCHECK(buffer_desc->buffer_ != nullptr); + DCHECK(!buffer_desc->is_cached()) << "HDFS cache reads don't go through this code path."; + if (!read_status.ok()) { + // Free buffer to release resources before we cancel the range so that all buffers + // are freed at cancellation. + reader_->FreeBuffer(buffer_desc.get()); + buffer_desc.reset(); + + // Propagate 'read_status' to the scan range. This will also wake up any waiting + // threads. + CancelInternal(read_status, true); + // No more reads for this scan range - we can close it. + Close(); + return ReadOutcome::CANCELLED; + } + + // After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'. + // Store the state we need before calling EnqueueReadyBuffer(). + bool eosr = buffer_desc->eosr_; + // Read successful - enqueue the buffer and return the appropriate outcome. + if (!EnqueueReadyBuffer(move(buffer_desc))) return ReadOutcome::CANCELLED; + if (eosr) { + // No more reads for this scan range - we can close it. + Close(); + return ReadOutcome::SUCCESS_EOSR; + } + return ReadOutcome::SUCCESS_NO_EOSR; } void ScanRange::SetBlockedOnBuffer() { @@ -202,18 +274,14 @@ void ScanRange::CleanUpUnusedBuffers(const unique_lock<mutex>& scan_range_lock) void ScanRange::Cancel(const Status& status) { // Cancelling a range that was never started, ignore. if (io_mgr_ == nullptr) return; - CancelInternal(status); + CancelInternal(status, false); + // Wait until an in-flight read is finished. The read thread will clean up the + // buffer it used. Once the range is cancelled, no more reads should be started. + WaitForInFlightRead(); reader_->RemoveActiveScanRange(this); } -void ScanRange::CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock, - const Status& status) { - DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock()); - CancelInternal(status); - reader_->RemoveActiveScanRangeLocked(reader_lock, this); -} - -void ScanRange::CancelInternal(const Status& status) { +void ScanRange::CancelInternal(const Status& status, bool read_error) { DCHECK(io_mgr_ != nullptr); DCHECK(!status.ok()); { @@ -223,10 +291,10 @@ void ScanRange::CancelInternal(const Status& status) { { unique_lock<mutex> hdfs_lock(hdfs_lock_); DCHECK(Validate()) << DebugString(); - // If already cancelled, preserve the original reason for cancellation. The first - // thread to set 'cancel_status_' does the cleanup below. - RETURN_VOID_IF_ERROR(cancel_status_); - cancel_status_ = status; + // If already cancelled, preserve the original reason for cancellation. Most of the + // cleanup is not required if already cancelled, but we need to set + // 'read_in_flight_' to false. + if (cancel_status_.ok()) cancel_status_ = status; } /// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads @@ -238,6 +306,10 @@ void ScanRange::CancelInternal(const Status& status) { /// Clean up buffers that we don't need any more because we won't read any more data. CleanUpUnusedBuffers(scan_range_lock); + if (read_error) { + DCHECK(read_in_flight_); + read_in_flight_ = false; + } } buffer_ready_cv_.NotifyAll(); @@ -246,6 +318,11 @@ void ScanRange::CancelInternal(const Status& status) { if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close(); } +void ScanRange::WaitForInFlightRead() { + unique_lock<mutex> scan_range_lock(lock_); + while (read_in_flight_) buffer_ready_cv_.Wait(scan_range_lock); +} + string ScanRange::DebugString() const { stringstream ss; ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_ @@ -303,6 +380,7 @@ ScanRange::~ScanRange() { DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed."; DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) << "Cached buffer was not released."; + DCHECK(!read_in_flight_); DCHECK_EQ(0, ready_buffers_.size()); DCHECK_EQ(0, num_buffers_in_reader_.Load()); } @@ -310,6 +388,7 @@ ScanRange::~ScanRange() { void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) { DCHECK(ready_buffers_.empty()); + DCHECK(!read_in_flight_); DCHECK(file != nullptr); DCHECK_GE(len, 0); DCHECK_GE(offset, 0); @@ -340,13 +419,14 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) { DCHECK(exclusive_hdfs_fh_ == nullptr); DCHECK(local_file_ == nullptr); + DCHECK(!read_in_flight_); io_mgr_ = io_mgr; reader_ = reader; local_file_ = nullptr; exclusive_hdfs_fh_ = nullptr; bytes_read_ = 0; unused_iomgr_buffer_bytes_ = 0; - iomgr_buffer_bytes_returned_ = 0; + iomgr_buffer_cumulative_bytes_used_ = 0; cancel_status_ = Status::OK(); eosr_queued_ = false; blocked_on_buffer_ = false; @@ -474,6 +554,13 @@ int64_t ScanRange::MaxReadChunkSize() const { // TODO: look at linux disk scheduling Status ScanRange::Read( uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) { + DCHECK(read_in_flight_); + // Delay before acquiring the lock, to allow triggering IMPALA-6587 race. +#ifndef NDEBUG + if (FLAGS_stress_disk_read_delay_ms > 0) { + SleepForMs(FLAGS_stress_disk_read_delay_ms); + } +#endif unique_lock<mutex> hdfs_lock(hdfs_lock_); RETURN_IF_ERROR(cancel_status_); @@ -664,7 +751,7 @@ Status ScanRange::ReadFromCache( desc->scan_range_offset_ = 0; desc->eosr_ = true; bytes_read_ = bytes_read; - EnqueueReadyBuffer(reader_lock, move(desc)); + EnqueueReadyBuffer(move(desc)); COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read); *read_succeeded = true; return Status::OK(); http://git-wip-us.apache.org/repos/asf/impala/blob/82c43f4f/testdata/workloads/functional-query/queries/QueryTest/scanners.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanners.test b/testdata/workloads/functional-query/queries/QueryTest/scanners.test index a000a15..b05786e 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/scanners.test +++ b/testdata/workloads/functional-query/queries/QueryTest/scanners.test @@ -117,3 +117,14 @@ select * from emptytable; ---- TYPES STRING,INT ==== +---- QUERY +# IMPALA-6587: regression test for reservation not being managed correctly. Should be +# able to execute this query reliably with the minimum reservation, even with tiny +# scan ranges. This reliably reproduced the issue when run against text/lzo. +set max_scan_range_length=1; +select count(*) from alltypessmall +---- RESULTS +100 +---- TYPES +BIGINT +====
