IMPALA-3936: BufferedBlockMgr fixes for Pin() while write in flight. Fix multiple bugs that could occur when a block was unpinned then pinned again while the write was in flight. There were two problems:
1. A block's buffer could be transferred while a write is in flight, leaving the block in an invalid state. The fix is to wait for the in-flight write to complete before transferring the buffer. 2. On certain code paths in WriteComplete(), condition variables weren't signalled, leading to threads waiting for write completion not being woken up. The fix is to clarify when condition variables will be signalled and ensure that the appropriate condition variables are always signalled when the write completes. Testing: Added a targeted unit test that exercises these code paths using a debug option that controls timing of writes. Reran the stress test configuration that reproducibly triggered the bug: TPC-H query 18 on a release build with a single impalad. It succeeded. Change-Id: I4be4fad8e6f2303db19ea1e2bd0f13523781ae8e Reviewed-on: http://gerrit.cloudera.org:8080/3832 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f4da9251 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f4da9251 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f4da9251 Branch: refs/heads/master Commit: f4da9251346129189806a969288f2f7c4532bbe5 Parents: 15d20d8 Author: Tim Armstrong <[email protected]> Authored: Thu Jul 28 23:18:24 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Mon Aug 15 23:33:57 2016 +0000 ---------------------------------------------------------------------- be/src/runtime/buffered-block-mgr-test.cc | 56 +++++++++++++++++++++++ be/src/runtime/buffered-block-mgr.cc | 63 +++++++++++++++++--------- be/src/runtime/buffered-block-mgr.h | 29 +++++++++--- 3 files changed, 121 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f4da9251/be/src/runtime/buffered-block-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc index bdef7cd..5b5ee8a 100644 --- a/be/src/runtime/buffered-block-mgr-test.cc +++ b/be/src/runtime/buffered-block-mgr-test.cc @@ -774,6 +774,62 @@ TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) { TearDownMgrs(); } +// This exercises a code path where: +// 1. A block A is unpinned. +// 2. A block B is unpinned. +// 3. A write for block A is initiated. +// 4. Block A is pinned. +// 5. Block B is pinned, with block A passed in to be deleted. +// Block A's buffer will be transferred to block B. +// 6. The write for block A completes. +// Previously there was a bug (IMPALA-3936) where the buffer transfer happened before the +// write completed. There were also various hangs related to missing condition variable +// notifications. +TEST_F(BufferedBlockMgrTest, TransferBufferDuringWrite) { + const int trials = 5; + const int max_num_buffers = 2; + BufferedBlockMgr::Client* client; + RuntimeState* query_state; + BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_, + 1, false, client_tracker_.get(), &client, &query_state); + + for (int trial = 0; trial < trials; ++trial) { + for (int delay_ms = 0; delay_ms <= 10; delay_ms += 5) { + // Force writes to be delayed to enlarge window of opportunity for bug. + block_mgr->set_debug_write_delay_ms(delay_ms); + vector<BufferedBlockMgr::Block*> blocks; + AllocateBlocks(block_mgr, client, 2, &blocks); + + // Force the second block to be written and have its buffer freed. + // We only have one buffer to share between the first and second blocks now. + ASSERT_OK(blocks[1]->Unpin()); + + // Create another client. Reserving different numbers of buffers can send it + // down different code paths because the original client is entitled to different + // number of buffers. + int reserved_buffers = trial % max_num_buffers; + BufferedBlockMgr::Client* tmp_client; + EXPECT_TRUE(block_mgr->RegisterClient("tmp_client", reserved_buffers, false, + client_tracker_.get(), query_state, &tmp_client).ok()); + BufferedBlockMgr::Block* tmp_block; + ASSERT_OK(block_mgr->GetNewBlock(tmp_client, NULL, &tmp_block)); + + // Initiate the write, repin the block, then immediately try to swap the buffer to + // the second block while the write is still in flight. + ASSERT_OK(blocks[0]->Unpin()); + bool pinned; + ASSERT_OK(blocks[0]->Pin(&pinned)); + ASSERT_TRUE(pinned); + ASSERT_OK(blocks[1]->Pin(&pinned, blocks[0], false)); + ASSERT_TRUE(pinned); + + blocks[1]->Delete(); + tmp_block->Delete(); + block_mgr->ClearReservations(tmp_client); + } + } +} + // Test that all APIs return cancelled after close. TEST_F(BufferedBlockMgrTest, Close) { int max_num_buffers = 5; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f4da9251/be/src/runtime/buffered-block-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc index 0fc617c..db62922 100644 --- a/be/src/runtime/buffered-block-mgr.cc +++ b/be/src/runtime/buffered-block-mgr.cc @@ -224,7 +224,8 @@ BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr is_cancelled_(false), writes_issued_(0), encryption_(FLAGS_disk_spill_encryption), - check_integrity_(FLAGS_disk_spill_encryption) { + check_integrity_(FLAGS_disk_spill_encryption), + debug_write_delay_ms_(0) { } Status BufferedBlockMgr::Create(RuntimeState* state, MemTracker* parent, @@ -490,14 +491,17 @@ Status BufferedBlockMgr::TransferBuffer(Block* dst, Block* src, bool unpin) { DCHECK(src != NULL); unique_lock<mutex> lock(lock_); - // First write out the src block. DCHECK(src->is_pinned_); DCHECK(!dst->is_pinned_); DCHECK(dst->buffer_desc_ == NULL); DCHECK_EQ(src->buffer_desc_->len, max_block_size_); + + // Ensure that there aren't any writes in flight for 'src'. + WaitForWrite(lock, src); src->is_pinned_ = false; if (unpin) { + // First write out the src block so we can grab its buffer. src->client_local_ = true; status = WriteUnpinnedBlock(src); if (!status.ok()) { @@ -506,9 +510,7 @@ Status BufferedBlockMgr::TransferBuffer(Block* dst, Block* src, bool unpin) { return status; } // Wait for the write to complete. - while (src->in_write_ && !is_cancelled_) { - src->write_complete_cv_.wait(lock); - } + WaitForWrite(lock, src); if (is_cancelled_) { // We can't be sure the write succeeded, so return the buffer to src. src->is_pinned_ = true; @@ -810,6 +812,13 @@ Status BufferedBlockMgr::WriteUnpinnedBlock(Block* block) { return Status::OK(); } +void BufferedBlockMgr::WaitForWrite(unique_lock<mutex>& lock, Block* block) { + DCHECK(!block->is_deleted_); + while (block->in_write_ && !is_cancelled_) { + block->write_complete_cv_.wait(lock); + } +} + Status BufferedBlockMgr::AllocateScratchSpace(int64_t block_size, TmpFileMgr::File** tmp_file, int64_t* file_offset) { // Assumes block manager lock is already taken. @@ -837,12 +846,18 @@ Status BufferedBlockMgr::AllocateScratchSpace(int64_t block_size, } void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) { +#ifndef NDEBUG + if (debug_write_delay_ms_ > 0) { + usleep(static_cast<int64_t>(debug_write_delay_ms_) * 1000); + } +#endif Status status = Status::OK(); lock_guard<mutex> lock(lock_); outstanding_writes_counter_->Add(-1); DCHECK(Validate()) << endl << DebugInternal(); DCHECK(is_cancelled_ || block->in_write_) << "WriteComplete() for block not in write." << endl << block->DebugString(); + DCHECK(block->buffer_desc_ != NULL); if (!block->client_local_) { DCHECK_GT(non_local_outstanding_writes_, 0) << block->DebugString(); --non_local_outstanding_writes_; @@ -872,24 +887,11 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) { } else if (block->client_local_) { DCHECK(!block->is_deleted_) << "Client should be waiting. No one should have deleted this block."; - block->write_complete_cv_.notify_one(); } else { DCHECK_EQ(block->buffer_desc_->len, max_block_size_) << "Only io sized buffers should spill"; free_io_buffers_.Enqueue(block->buffer_desc_); - // Finish the DeleteBlock() work. - if (block->is_deleted_) { - block->buffer_desc_->block = NULL; - block->buffer_desc_ = NULL; - ReturnUnusedBlock(block); - block = NULL; - } - // Multiple threads may be waiting for the same block in FindBuffer(). Wake them - // all up. One thread will get this block, and the others will re-evaluate whether - // they should continue waiting and if another write needs to be initiated. - buffer_available_cv_.notify_all(); } - DCHECK(Validate()) << endl << DebugInternal(); if (!write_status.ok() || !status.ok() || is_cancelled_) { VLOG_FILE << "Query: " << query_id_ << ". Write did not complete successfully: " @@ -908,11 +910,29 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) { VLOG_QUERY << "Query: " << query_id_ << " error while writing unpinned blocks."; if (state != NULL) state->LogError(status.msg()); } - // Set cancelled and wake up waiting threads if an error occurred. Note that in - // the case of client_local_, that thread was woken up above. + // Set cancelled. Threads waiting for a write will be woken up in the normal way when + // one of the writes they are waiting for completes. is_cancelled_ = true; - buffer_available_cv_.notify_all(); } + + // Notify any threads that may have been expecting to get block's buffer based on + // the value of 'non_local_outstanding_writes_'. Wake them all up. If we added + // a buffer to 'free_io_buffers_', one thread will get a buffer. All the others + // will re-evaluate whether they should continue waiting and if another write needs + // to be initiated. + if (!block->client_local_) buffer_available_cv_.notify_all(); + if (block->is_deleted_) { + // Finish the DeleteBlock() work. + block->buffer_desc_->block = NULL; + block->buffer_desc_ = NULL; + ReturnUnusedBlock(block); + block = NULL; + } else { + // Wake up the thread waiting on this block (if any). + block->write_complete_cv_.notify_one(); + } + + DCHECK(Validate()) << endl << DebugInternal(); } void BufferedBlockMgr::DeleteBlock(Block* block) { @@ -956,6 +976,7 @@ void BufferedBlockMgr::DeleteBlockLocked(const unique_lock<mutex>& lock, Block* } else { if (!free_io_buffers_.Contains(block->buffer_desc_)) { free_io_buffers_.Enqueue(block->buffer_desc_); + // Wake up one of the waiting threads, which will grab the buffer. buffer_available_cv_.notify_one(); } block->buffer_desc_->block = NULL; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f4da9251/be/src/runtime/buffered-block-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h index 58744ff..ac707bc 100644 --- a/be/src/runtime/buffered-block-mgr.h +++ b/be/src/runtime/buffered-block-mgr.h @@ -288,8 +288,9 @@ class BufferedBlockMgr { /// True if the block is deleted by the client. bool is_deleted_; - /// Condition variable for when there is a specific client waiting for this block. - /// Only used if client_local_ is true. + /// Condition variable to wait for the write to this block to finish. If 'in_write_' + /// is true, notify_one() will eventually be called on this condition variable. Only + /// on thread should wait on this cv at a time. /// TODO: Currently we use block_mgr_->lock_ for this condvar. There is no reason to /// use that lock_ that is already overloaded, see IMPALA-1883. boost::condition_variable write_complete_cv_; @@ -398,6 +399,8 @@ class BufferedBlockMgr { RuntimeProfile* profile() { return profile_.get(); } int writes_issued() const { return writes_issued_; } + void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; } + private: friend struct Client; @@ -443,11 +446,12 @@ class BufferedBlockMgr { /// cancellation. It should be called without the lock_ acquired. Status DeleteOrUnpinBlock(Block* block, bool unpin); - /// Transfers the buffer from 'src' to 'dst'. 'src' must be pinned. + /// Transfers the buffer from 'src' to 'dst'. 'src' must be pinned. If a write is + /// already in flight for 'src', this may block until that write completes. /// If unpin == false, 'src' is simply deleted. /// If unpin == true, 'src' is unpinned and it may block until the write of 'src' is - /// completed. In that case it will use the lock_ for the condvar. Thus, the lock_ - /// needs to not have been taken when this function is called. + /// completed. + /// The caller should not hold 'lock_'. Status TransferBuffer(Block* dst, Block* src, bool unpin); /// Returns the total number of unreserved buffers. This is the sum of unpinned, @@ -483,6 +487,10 @@ class BufferedBlockMgr { /// Issues the write for this block to the DiskIoMgr. Status WriteUnpinnedBlock(Block* block); + /// Wait until either the write for 'block' completes or the block mgr is cancelled. + /// 'lock_' must be held with 'lock'. + void WaitForWrite(boost::unique_lock<boost::mutex>& lock, Block* block); + /// Allocate block_size bytes in a temporary file. Try multiple disks if error occurs. /// Returns an error only if no temporary files are usable. Status AllocateScratchSpace(int64_t block_size, TmpFileMgr::File** tmp_file, @@ -549,7 +557,12 @@ class BufferedBlockMgr { /// This does not include client-local writes. int non_local_outstanding_writes_; - /// Signal availability of free buffers. + /// Signal availability of free buffers. Also signalled when a write completes for a + /// pinned block, in case another thread was expecting to obtain its buffer. If + /// 'non_local_outstanding_writes_' > 0, notify_all() will eventually be called on + /// this condition variable. To avoid free buffers accumulating while threads wait + /// on the cv, a woken thread must grab an available buffer (unless is_cancelled_ is + /// true at that time). boost::condition_variable buffer_available_cv_; /// All used or unused blocks allocated by the BufferedBlockMgr. @@ -667,6 +680,10 @@ class BufferedBlockMgr { /// and hence no real reason to keep this separate from encryption. When true, blocks /// will have an integrity check (SHA-256) performed after being read from disk. const bool check_integrity_; + + /// Debug option to delay write completion. + int debug_write_delay_ms_; + }; // class BufferedBlockMgr } // namespace impala.
