Repository: impala Updated Branches: refs/heads/master 3ff4cde77 -> bdd904922
IMPALA-7402: fix DCHECK when releasing reservation in scan The bug is that ScannerContext::Stream::GetNextBuffer(), when reading past the end of a scan range and ScanRange::GetNext() returned cancelled, did not wait for buffers owned by the scan range to be freed. Subsequent code assumes that all buffers allocated by the scanner are freed after HdfsScanner::Close() returns, but this was not guaranteed. The fix is to strengthen the post-condition of ScanRange::GetNext() so that buffers are guaranteed to be returned when GetNext() returns CANCELLED. Testing: Added a unit test that tests the new invariant. Manually tested that this fixed the regression by inserting a 10ms sleep in BufferPool::FreeBuffer() and looping the test that failed. Ran DiskIoMgrStressTest overnight and ran core tests. Change-Id: I445d306de0c6bfb71359100de2fdf3cd4326f6d9 Reviewed-on: http://gerrit.cloudera.org:8080/11283 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bdd90492 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bdd90492 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bdd90492 Branch: refs/heads/master Commit: bdd904922a220c37326928ac674779acaef5f6fa Parents: 3ff4cde Author: Tim Armstrong <[email protected]> Authored: Mon Aug 20 20:00:10 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Aug 22 02:22:50 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/io/disk-io-mgr-test.cc | 52 ++++++++++++++++++++++++++++++ be/src/runtime/io/request-ranges.h | 3 +- be/src/runtime/io/scan-range.cc | 14 ++++---- 3 files changed, 62 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/bdd90492/be/src/runtime/io/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc index 3d89d04..6c84e19 100644 --- a/be/src/runtime/io/disk-io-mgr-test.cc +++ b/be/src/runtime/io/disk-io-mgr-test.cc @@ -1307,6 +1307,58 @@ TEST_F(DiskIoMgrTest, CancelReleasesResources) { io_mgr.UnregisterContext(reader.get()); } +// Regression test for IMPALA-7402 - RequestContext::Cancel() propagation via +// ScanRange::GetNext() does not guarantee buffers are released. +TEST_F(DiskIoMgrTest, FinalGetNextReleasesResources) { + InitRootReservation(LARGE_RESERVATION_LIMIT); + const char* tmp_file = "/tmp/disk_io_mgr_test.txt"; + const char* data = "the quick brown fox jumped over the lazy dog"; + int len = strlen(data); + const int64_t MIN_BUFFER_SIZE = 2; + const int64_t MAX_BUFFER_SIZE = 1024; + CreateTempFile(tmp_file, data); + + // Get mtime for file + struct stat stat_val; + stat(tmp_file, &stat_val); + + const int NUM_DISK_THREADS = 20; + DiskIoMgr io_mgr( + 1, NUM_DISK_THREADS, NUM_DISK_THREADS, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); +#ifndef NDEBUG + auto s = ScopedFlagSetter<int32_t>::Make(&FLAGS_stress_disk_read_delay_ms, 5); +#endif + + ASSERT_OK(io_mgr.Init()); + BufferPool::ClientHandle read_client; + RegisterBufferPoolClient( + LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); + + for (int i = 0; i < 10; ++i) { + unique_ptr<RequestContext> reader = io_mgr.RegisterContext(); + ScanRange* range = InitRange(&pool_, tmp_file, 0, len, 0, stat_val.st_mtime); + bool needs_buffers; + ASSERT_OK(reader->StartScanRange(range, &needs_buffers)); + EXPECT_TRUE(needs_buffers); + ASSERT_OK(io_mgr.AllocateBuffersForRange(&read_client, range, MAX_BUFFER_SIZE)); + // Give disk I/O thread a chance to start read. + SleepForMs(1); + + reader->Cancel(); + // The scan range should hold no resources once ScanRange::GetNext() returns. + unique_ptr<BufferDescriptor> buffer; + Status status = range->GetNext(&buffer); + if (status.ok()) { + DCHECK(buffer->eosr()); + range->ReturnBuffer(move(buffer)); + } + EXPECT_EQ(0, read_client.GetUsedReservation()) << " iter " << i << ": " + << status.GetDetail(); + io_mgr.UnregisterContext(reader.get()); + } + buffer_pool()->DeregisterClient(&read_client); +} + // Test reading into a client-allocated buffer. TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { InitRootReservation(LARGE_RESERVATION_LIMIT); http://git-wip-us.apache.org/repos/asf/impala/blob/bdd90492/be/src/runtime/io/request-ranges.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h index 22c6c36..95e832d 100644 --- a/be/src/runtime/io/request-ranges.h +++ b/be/src/runtime/io/request-ranges.h @@ -242,7 +242,8 @@ class ScanRange : public RequestRange { /// Returns the next buffer for this scan range. buffer is an output parameter. /// This function blocks until a buffer is ready or an error occurred. If this is /// called when all buffers have been returned, *buffer is set to nullptr and Status::OK - /// is returned. + /// is returned. If this returns buffer->eos() or an error status, then all buffers + /// owned by the scan range were either returned to callers of GetNext() or freed. /// Only one thread can be in GetNext() at any time. Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; http://git-wip-us.apache.org/repos/asf/impala/blob/bdd90492/be/src/runtime/io/scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc index 6028fb4..12037ca 100644 --- a/be/src/runtime/io/scan-range.cc +++ b/be/src/runtime/io/scan-range.cc @@ -82,14 +82,16 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { { unique_lock<mutex> scan_range_lock(lock_); DCHECK(Validate()) << DebugString(); - // No more buffers to return - return the cancel status or OK if not cancelled. - if (all_buffers_returned(scan_range_lock)) return cancel_status_; - - while (ready_buffers_.empty() && cancel_status_.ok()) { + while (!all_buffers_returned(scan_range_lock) && ready_buffers_.empty()) { buffer_ready_cv_.Wait(scan_range_lock); } - /// Propagate cancellation to the client if it happened while we were waiting. - RETURN_IF_ERROR(cancel_status_); + // No more buffers to return - return the cancel status or OK if not cancelled. + if (all_buffers_returned(scan_range_lock)) { + // Wait until read finishes to ensure buffers are freed. + while (read_in_flight_) buffer_ready_cv_.Wait(scan_range_lock); + DCHECK_EQ(0, ready_buffers_.size()); + return cancel_status_; + } // Remove the first ready buffer from the queue and return it DCHECK(!ready_buffers_.empty());
