IMPALA-4748: crash in TmpFileMgr when hitting process mem limit

The bug is that FileGroup didn't correctly handle its 'io_ctx_'
being asynchronously cancelled: it left the WriteHandle in an
invalid state. This could happen when the process memory limit
was exceeded.

I fixed this in two ways (either of which would be sufficient
to avoid this exact crash):
* Fix the error handling in TmpFileMgr so that things are left
  in a valid state on the error path.
* Stop DiskIoMgr from asynchronously cancelling I/O contexts
  with no associated MemTracker. The mem_limit check and error
  propagation is necessary when DiskIoMgr will allocate memory
  on behalf of the client, but is not necessary when it is not
  allocating memory for the client - it just added a redundant
  error propagation mechanism.

Testing:
This scenario should no longer be possible for BufferedBlockMgr
since DiskIoMgr won't cancel its I/O context, since it has no
associated MemTracker. However, to test that errors on this path
are correctly handled, I added a simple unit test to TmpFileMgr
that forces cancellation of the I/O context.

Change-Id: Ib0a624212bc17f7824e6d14ad143c0d5894206f8
Reviewed-on: http://gerrit.cloudera.org:8080/5683
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/31025ab1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/31025ab1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/31025ab1

Branch: refs/heads/master
Commit: 31025ab10e838b8d575481c944a42ce10cf2ee41
Parents: 85edc15
Author: Tim Armstrong <[email protected]>
Authored: Tue Jan 10 17:40:50 2017 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jan 19 05:35:23 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr.cc       | 11 ++++-------
 be/src/runtime/tmp-file-mgr-test.cc | 27 +++++++++++++++++++++++++++
 be/src/runtime/tmp-file-mgr.cc      | 25 +++++++++++++++++++++----
 be/src/runtime/tmp-file-mgr.h       | 14 +++++++++-----
 4 files changed, 61 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/31025ab1/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 16fd211..7a93ca2 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -870,19 +870,16 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* 
disk_queue, RequestRange** range,
     // same reader here (the reader is removed from the queue).  There can be
     // other disk threads operating on this reader in other functions though.
 
-    // We just picked a reader, check the mem limits. We need to fail the 
request if
-    // the reader exceeded its memory limit, or if we're over a global memory 
limit.
+    // We just picked a reader. Before we may allocate a buffer on its behalf, 
check that
+    // it has not exceeded any memory limits (e.g. the query or process limit).
     // TODO: once IMPALA-3200 is fixed, we should be able to remove the free 
lists and
     // move these memory limit checks to GetFreeBuffer().
     // Note that calling AnyLimitExceeded() can result in a call to 
GcIoBuffers().
-    bool any_io_mgr_limit_exceeded = 
free_buffer_mem_tracker_->AnyLimitExceeded();
     // TODO: IMPALA-3209: we should not force a reader over its memory limit by
     // pushing more buffers to it. Most readers can make progress and operate 
within
     // a fixed memory limit.
-    bool reader_limit_exceeded = (*request_context)->mem_tracker_ != NULL
-        ? (*request_context)->mem_tracker_->AnyLimitExceeded() : false;
-
-    if (any_io_mgr_limit_exceeded || reader_limit_exceeded) {
+    if ((*request_context)->mem_tracker_ != NULL
+        && (*request_context)->mem_tracker_->AnyLimitExceeded()) {
       (*request_context)->Cancel(Status::MemLimitExceeded());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/31025ab1/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index d34eb42..c4ddff6 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -116,6 +116,11 @@ class TmpFileMgrTest : public ::testing::Test {
     group->next_allocation_index_ = value;
   }
 
+  /// Helper to cancel the FileGroup DiskIoRequestContext.
+  static void CancelIoContext(TmpFileMgr::FileGroup* group) {
+    group->io_mgr_->CancelContext(group->io_ctx_);
+  }
+
   /// Helper to get the # of bytes allocated by the group. Validates that the 
sum across
   /// all files equals this total.
   static int64_t BytesAllocated(TmpFileMgr::FileGroup* group) {
@@ -409,6 +414,28 @@ TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
   file_group.Close();
   test_env_->TearDownQueries();
 }
+
+// Regression test for IMPALA-4748, where hitting the process memory limit 
caused
+// internal invariants of TmpFileMgr to be broken on error path.
+TEST_F(TmpFileMgrTest, TestProcessMemLimitExceeded) {
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(test_env_->tmp_file_mgr(), io_mgr(), 
profile_, id);
+
+  const int DATA_SIZE = 64;
+  vector<uint8_t> data(DATA_SIZE);
+
+  // Fake the asynchronous error from the process mem limit by cancelling the 
io context.
+  CancelIoContext(&file_group);
+
+  // After this error, writing via the file group should fail.
+  DiskIoMgr::WriteRange::WriteDoneCallback callback =
+      bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
+  unique_ptr<TmpFileMgr::WriteHandle> handle;
+  Status status = file_group.Write(MemRange(data.data(), DATA_SIZE), callback, 
&handle);
+  EXPECT_EQ(TErrorCode::CANCELLED, status.code());
+  file_group.Close();
+  test_env_->TearDownRuntimeStates();
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/31025ab1/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index d6f0010..932e9c1 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -516,21 +516,38 @@ Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, 
DiskIoRequestContext* i
 
   if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer));
 
+  // Set all member variables before calling AddWriteRange(): after it 
succeeds,
+  // WriteComplete() may be called concurrently with the remainder of this 
function.
   file_ = file;
-  write_in_flight_ = true;
   write_range_.reset(
       new DiskIoMgr::WriteRange(file->path(), offset, file->AssignDiskQueue(), 
callback));
   write_range_->SetData(buffer.data(), buffer.len());
-  return io_mgr->AddWriteRange(io_ctx, write_range_.get());
+  write_in_flight_ = true;
+  Status status = io_mgr->AddWriteRange(io_ctx, write_range_.get());
+  if (!status.ok()) {
+    // The write will not be in flight if we returned with an error.
+    write_in_flight_ = false;
+    // We won't return this WriteHandle to the client of FileGroup, so it 
won't be
+    // cancelled in the normal way. Mark the handle as cancelled so it can be
+    // cleanly destroyed.
+    is_cancelled_ = true;
+    return status;
+  }
+  return Status::OK();
 }
 
 Status TmpFileMgr::WriteHandle::RetryWrite(
     DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t 
offset) {
   DCHECK(write_in_flight_);
   file_ = file;
-  write_in_flight_ = true;
   write_range_->SetRange(file->path(), offset, file->AssignDiskQueue());
-  return io_mgr->AddWriteRange(io_ctx, write_range_.get());
+  Status status = io_mgr->AddWriteRange(io_ctx, write_range_.get());
+  if (!status.ok()) {
+    // The write will not be in flight if we returned with an error.
+    write_in_flight_ = false;
+    return status;
+  }
+  return Status::OK();
 }
 
 void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/31025ab1/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 65476cb..409c7ce 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -260,7 +260,8 @@ class TmpFileMgr {
   /// Public methods of WriteHandle are safe to call concurrently from 
multiple threads.
   class WriteHandle {
    public:
-    // The write must be destroyed by FileGroup::DestroyWriteHandle().
+    /// The write must be destroyed by passing it to FileGroup - destroying it 
before
+    /// cancelling the write is an error.
     ~WriteHandle() {
       DCHECK(!write_in_flight_);
       DCHECK(is_cancelled_);
@@ -280,13 +281,16 @@ class TmpFileMgr {
 
     WriteHandle(RuntimeProfile::Counter* encryption_timer, WriteDoneCallback 
cb);
 
-    /// Starts a write of 'buffer' to 'offset' of 'file'.
+    /// Starts a write of 'buffer' to 'offset' of 'file'. 'write_in_flight_' 
must be false
+    /// before calling. After returning, 'write_in_flight_' is true on success 
or false on
+    /// failure and 'is_cancelled_' is set to true on failure.
     Status Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file,
         int64_t offset, MemRange buffer,
         DiskIoMgr::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT;
 
     /// Retry the write after the initial write failed with an error, instead 
writing to
-    /// 'offset' of 'file'.
+    /// 'offset' of 'file'. 'write_in_flight_' must be true before calling.
+    /// After returning, 'write_in_flight_' is true on success or false on 
failure.
     Status RetryWrite(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* 
file,
         int64_t offset) WARN_UNUSED_RESULT;
 
@@ -333,10 +337,10 @@ class TmpFileMgr {
     /// acquiring other locks or invoking 'cb_'.
     boost::mutex write_state_lock_;
 
-    // True if the the write has been cancelled (but is not necessarily 
complete).
+    /// True if the the write has been cancelled (but is not necessarily 
complete).
     bool is_cancelled_;
 
-    // True if a write is in flight.
+    /// True if a write is in flight.
     bool write_in_flight_;
 
     /// Signalled when the write completes and 'write_in_flight_' becomes 
false, before

Reply via email to