IMPALA-4748: crash in TmpFileMgr when hitting process mem limit The bug is that FileGroup didn't correctly handle its 'io_ctx_' being asynchronously cancelled: it left the WriteHandle in an invalid state. This could happen when the process memory limit was exceeded.
I fixed this in two ways (either of which would be sufficient to avoid this exact crash): * Fix the error handling in TmpFileMgr so that things are left in a valid state on the error path. * Stop DiskIoMgr from asynchronously cancelling I/O contexts with no associated MemTracker. The mem_limit check and error propagation is necessary when DiskIoMgr will allocate memory on behalf of the client, but is not necessary when it is not allocating memory for the client - it just added a redundant error propagation mechanism. Testing: This scenario should no longer be possible for BufferedBlockMgr since DiskIoMgr won't cancel its I/O context, since it has no associated MemTracker. However, to test that errors on this path are correctly handled, I added a simple unit test to TmpFileMgr that forces cancellation of the I/O context. Change-Id: Ib0a624212bc17f7824e6d14ad143c0d5894206f8 Reviewed-on: http://gerrit.cloudera.org:8080/5683 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/31025ab1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/31025ab1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/31025ab1 Branch: refs/heads/master Commit: 31025ab10e838b8d575481c944a42ce10cf2ee41 Parents: 85edc15 Author: Tim Armstrong <[email protected]> Authored: Tue Jan 10 17:40:50 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jan 19 05:35:23 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/disk-io-mgr.cc | 11 ++++------- be/src/runtime/tmp-file-mgr-test.cc | 27 +++++++++++++++++++++++++++ be/src/runtime/tmp-file-mgr.cc | 25 +++++++++++++++++++++---- be/src/runtime/tmp-file-mgr.h | 14 +++++++++----- 4 files changed, 61 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/31025ab1/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 16fd211..7a93ca2 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -870,19 +870,16 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, // same reader here (the reader is removed from the queue). There can be // other disk threads operating on this reader in other functions though. - // We just picked a reader, check the mem limits. We need to fail the request if - // the reader exceeded its memory limit, or if we're over a global memory limit. + // We just picked a reader. Before we may allocate a buffer on its behalf, check that + // it has not exceeded any memory limits (e.g. the query or process limit). // TODO: once IMPALA-3200 is fixed, we should be able to remove the free lists and // move these memory limit checks to GetFreeBuffer(). // Note that calling AnyLimitExceeded() can result in a call to GcIoBuffers(). - bool any_io_mgr_limit_exceeded = free_buffer_mem_tracker_->AnyLimitExceeded(); // TODO: IMPALA-3209: we should not force a reader over its memory limit by // pushing more buffers to it. Most readers can make progress and operate within // a fixed memory limit. - bool reader_limit_exceeded = (*request_context)->mem_tracker_ != NULL - ? (*request_context)->mem_tracker_->AnyLimitExceeded() : false; - - if (any_io_mgr_limit_exceeded || reader_limit_exceeded) { + if ((*request_context)->mem_tracker_ != NULL + && (*request_context)->mem_tracker_->AnyLimitExceeded()) { (*request_context)->Cancel(Status::MemLimitExceeded()); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/31025ab1/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index d34eb42..c4ddff6 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -116,6 +116,11 @@ class TmpFileMgrTest : public ::testing::Test { group->next_allocation_index_ = value; } + /// Helper to cancel the FileGroup DiskIoRequestContext. + static void CancelIoContext(TmpFileMgr::FileGroup* group) { + group->io_mgr_->CancelContext(group->io_ctx_); + } + /// Helper to get the # of bytes allocated by the group. Validates that the sum across /// all files equals this total. static int64_t BytesAllocated(TmpFileMgr::FileGroup* group) { @@ -409,6 +414,28 @@ TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) { file_group.Close(); test_env_->TearDownQueries(); } + +// Regression test for IMPALA-4748, where hitting the process memory limit caused +// internal invariants of TmpFileMgr to be broken on error path. +TEST_F(TmpFileMgrTest, TestProcessMemLimitExceeded) { + TUniqueId id; + TmpFileMgr::FileGroup file_group(test_env_->tmp_file_mgr(), io_mgr(), profile_, id); + + const int DATA_SIZE = 64; + vector<uint8_t> data(DATA_SIZE); + + // Fake the asynchronous error from the process mem limit by cancelling the io context. + CancelIoContext(&file_group); + + // After this error, writing via the file group should fail. + DiskIoMgr::WriteRange::WriteDoneCallback callback = + bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1); + unique_ptr<TmpFileMgr::WriteHandle> handle; + Status status = file_group.Write(MemRange(data.data(), DATA_SIZE), callback, &handle); + EXPECT_EQ(TErrorCode::CANCELLED, status.code()); + file_group.Close(); + test_env_->TearDownRuntimeStates(); +} } int main(int argc, char** argv) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/31025ab1/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index d6f0010..932e9c1 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -516,21 +516,38 @@ Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* i if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer)); + // Set all member variables before calling AddWriteRange(): after it succeeds, + // WriteComplete() may be called concurrently with the remainder of this function. file_ = file; - write_in_flight_ = true; write_range_.reset( new DiskIoMgr::WriteRange(file->path(), offset, file->AssignDiskQueue(), callback)); write_range_->SetData(buffer.data(), buffer.len()); - return io_mgr->AddWriteRange(io_ctx, write_range_.get()); + write_in_flight_ = true; + Status status = io_mgr->AddWriteRange(io_ctx, write_range_.get()); + if (!status.ok()) { + // The write will not be in flight if we returned with an error. + write_in_flight_ = false; + // We won't return this WriteHandle to the client of FileGroup, so it won't be + // cancelled in the normal way. Mark the handle as cancelled so it can be + // cleanly destroyed. + is_cancelled_ = true; + return status; + } + return Status::OK(); } Status TmpFileMgr::WriteHandle::RetryWrite( DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset) { DCHECK(write_in_flight_); file_ = file; - write_in_flight_ = true; write_range_->SetRange(file->path(), offset, file->AssignDiskQueue()); - return io_mgr->AddWriteRange(io_ctx, write_range_.get()); + Status status = io_mgr->AddWriteRange(io_ctx, write_range_.get()); + if (!status.ok()) { + // The write will not be in flight if we returned with an error. + write_in_flight_ = false; + return status; + } + return Status::OK(); } void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/31025ab1/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index 65476cb..409c7ce 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -260,7 +260,8 @@ class TmpFileMgr { /// Public methods of WriteHandle are safe to call concurrently from multiple threads. class WriteHandle { public: - // The write must be destroyed by FileGroup::DestroyWriteHandle(). + /// The write must be destroyed by passing it to FileGroup - destroying it before + /// cancelling the write is an error. ~WriteHandle() { DCHECK(!write_in_flight_); DCHECK(is_cancelled_); @@ -280,13 +281,16 @@ class TmpFileMgr { WriteHandle(RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb); - /// Starts a write of 'buffer' to 'offset' of 'file'. + /// Starts a write of 'buffer' to 'offset' of 'file'. 'write_in_flight_' must be false + /// before calling. After returning, 'write_in_flight_' is true on success or false on + /// failure and 'is_cancelled_' is set to true on failure. Status Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset, MemRange buffer, DiskIoMgr::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT; /// Retry the write after the initial write failed with an error, instead writing to - /// 'offset' of 'file'. + /// 'offset' of 'file'. 'write_in_flight_' must be true before calling. + /// After returning, 'write_in_flight_' is true on success or false on failure. Status RetryWrite(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset) WARN_UNUSED_RESULT; @@ -333,10 +337,10 @@ class TmpFileMgr { /// acquiring other locks or invoking 'cb_'. boost::mutex write_state_lock_; - // True if the the write has been cancelled (but is not necessarily complete). + /// True if the the write has been cancelled (but is not necessarily complete). bool is_cancelled_; - // True if a write is in flight. + /// True if a write is in flight. bool write_in_flight_; /// Signalled when the write completes and 'write_in_flight_' becomes false, before
