IMPALA-3276: consistently handle pin failure in BTS::PrepareForRead()

BufferedTupleStream::PrepareForRead() can fail in all scenarios except
when the stream is all pinned. The basic problem is that reservations
in the BufferedBlockMgr are not dependable since they can be
overcommitted.

This patch changes the interface so that all callers have to handle pin
failures and updates the callsites.

Change-Id: Iacf1aba50d79b01acdc7269dc23f07b8c19a151a
Reviewed-on: http://gerrit.cloudera.org:8080/2684
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/36c294b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/36c294b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/36c294b5

Branch: refs/heads/master
Commit: 36c294b55e64b6b9dd1c0fca30205a05db24b120
Parents: 92fafa1
Author: Tim Armstrong <[email protected]>
Authored: Wed Mar 30 13:52:44 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Tue Apr 12 14:03:44 2016 -0700

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc            | 10 +++++-
 be/src/exec/partitioned-hash-join-node.cc    | 41 ++++++++++++++++++++---
 be/src/runtime/buffered-tuple-stream-test.cc | 24 +++++++++----
 be/src/runtime/buffered-tuple-stream.cc      |  7 ++--
 be/src/runtime/buffered-tuple-stream.h       |  7 ++--
 5 files changed, 70 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36c294b5/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc 
b/be/src/exec/analytic-eval-node.cc
index 94725ae..9442a78 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -190,7 +190,15 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
       state->block_mgr(), client_, false /* use_initial_small_buffers */,
       true /* read_write */);
   RETURN_IF_ERROR(input_stream_->Init(id(), runtime_profile(), true));
-  RETURN_IF_ERROR(input_stream_->PrepareForRead(true));
+  bool got_read_buffer;
+  RETURN_IF_ERROR(input_stream_->PrepareForRead(true, &got_read_buffer));
+  if (!got_read_buffer) {
+    Status status = Status::MemLimitExceeded();
+    status.AddDetail("Failed to acquire initial read buffer for analytic 
function "
+        "evaluation. Reducing query concurrency or increasing the memory limit 
may "
+        "help this query to complete successfully.");
+    return status;
+  }
 
   DCHECK_EQ(evaluators_.size(), fn_ctxs_.size());
   for (int i = 0; i < evaluators_.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36c294b5/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc 
b/be/src/exec/partitioned-hash-join-node.cc
index 151b0c6..b22dff1 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -41,6 +41,10 @@
 DEFINE_bool(enable_phj_probe_side_filtering, true,
     "Enables pushing PHJ build side filters to probe side");
 
+const string PREPARE_FOR_READ_FAILED_ERROR_MSG = "Failed to acquire initial 
read buffer "
+    "for stream in hash join node $0. Reducing query concurrency or increasing 
the "
+    "memory limit may help this query to complete successfully.";
+
 using namespace impala;
 using namespace llvm;
 using namespace strings;
@@ -433,7 +437,9 @@ Status 
PartitionedHashJoinNode::Partition::BuildHashTableInternal(
   // We got the buffers we think we will need, try to build the hash table.
   RETURN_IF_ERROR(build_rows_->PinStream(false, built));
   if (!*built) return Status::OK();
-  RETURN_IF_ERROR(build_rows_->PrepareForRead(false));
+  bool got_read_buffer;
+  RETURN_IF_ERROR(build_rows_->PrepareForRead(false, &got_read_buffer));
+  DCHECK(got_read_buffer) << "Stream was already pinned.";
 
   RowBatch batch(parent_->child(1)->row_desc(), state->batch_size(),
       parent_->mem_tracker());
@@ -623,7 +629,14 @@ Status 
PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level
   if (input_partition_ != NULL) {
     DCHECK(input_partition_->build_rows() != NULL);
     DCHECK_EQ(input_partition_->build_rows()->blocks_pinned(), 0) << 
NodeDebugString();
-    RETURN_IF_ERROR(input_partition_->build_rows()->PrepareForRead(true));
+    bool got_read_buffer;
+    RETURN_IF_ERROR(
+        input_partition_->build_rows()->PrepareForRead(true, 
&got_read_buffer));
+    if (!got_read_buffer) {
+      Status status = Status::MemLimitExceeded();
+      status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
+      return status;
+    }
   }
 
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
@@ -788,7 +801,13 @@ Status 
PartitionedHashJoinNode::PrepareNextPartition(RuntimeState* state) {
   DCHECK(input_partition_->is_spilled());
 
   // Reserve one buffer to read the probe side.
-  RETURN_IF_ERROR(input_partition_->probe_rows()->PrepareForRead(true));
+  bool got_read_buffer;
+  RETURN_IF_ERROR(input_partition_->probe_rows()->PrepareForRead(true, 
&got_read_buffer));
+  if (!got_read_buffer) {
+    Status status = Status::MemLimitExceeded();
+    status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
+    return status;
+  }
   ht_ctx_->set_level(input_partition_->level_);
 
   int64_t mem_limit = mem_tracker()->SpareCapacity();
@@ -1045,7 +1064,13 @@ void 
PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
 
 Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() {
   DCHECK_EQ(null_probe_output_idx_, -1);
-  RETURN_IF_ERROR(null_probe_rows_->PrepareForRead(true));
+  bool got_read_buffer;
+  RETURN_IF_ERROR(null_probe_rows_->PrepareForRead(true, &got_read_buffer));
+  if (!got_read_buffer) {
+    Status status = Status::MemLimitExceeded();
+    status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
+    return status;
+  }
   DCHECK_EQ(probe_batch_->num_rows(), 0);
   probe_batch_pos_ = 0;
   null_probe_output_idx_ = 0;
@@ -1120,7 +1145,13 @@ Status 
PartitionedHashJoinNode::PrepareNullAwarePartition() {
   if (!got_rows) return NullAwareAntiJoinError(true);
 
   // Initialize the streams for read.
-  RETURN_IF_ERROR(probe_stream->PrepareForRead(true));
+  bool got_read_buffer;
+  RETURN_IF_ERROR(probe_stream->PrepareForRead(true, &got_read_buffer));
+  if (!got_read_buffer) {
+    Status status = Status::MemLimitExceeded();
+    status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
+    return status;
+  }
   probe_batch_pos_ = 0;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36c294b5/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index 0323cb8..c1633a9 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -314,7 +314,9 @@ class SimpleTupleStreamTest : public testing::Test {
       batch->Reset();
     }
 
-    ASSERT_OK(stream.PrepareForRead(false));
+    bool got_read_buffer;
+    ASSERT_OK(stream.PrepareForRead(false, &got_read_buffer));
+    ASSERT_TRUE(got_read_buffer);
 
     // Read all the rows back
     vector<T> results;
@@ -333,7 +335,9 @@ class SimpleTupleStreamTest : public testing::Test {
           client_, small_buffers == 0,  // initial small buffers
           true); // read_write
       ASSERT_OK(stream.Init(-1, NULL, true));
-      ASSERT_OK(stream.PrepareForRead(true));
+      bool got_read_buffer;
+      ASSERT_OK(stream.PrepareForRead(true, &got_read_buffer));
+      ASSERT_TRUE(got_read_buffer);
       if (unpin_stream) ASSERT_OK(stream.UnpinStream());
 
       vector<int> results;
@@ -575,7 +579,9 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) {
   int read_iters = 3;
   for (int i = 0; i < read_iters; ++i) {
     bool delete_on_read = i == read_iters - 1;
-    ASSERT_OK(stream.PrepareForRead(delete_on_read));
+    bool got_read_buffer;
+    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_buffer));
+    ASSERT_TRUE(got_read_buffer);
 
     if (varlen_data) {
       vector<StringValue> results;
@@ -690,7 +696,9 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
   for (int delete_on_read = 0; delete_on_read <= 1; ++delete_on_read) {
     // Keep stream in memory and test we can read ok.
     vector<StringValue> results;
-    stream.PrepareForRead(delete_on_read);
+    bool got_read_buffer;
+    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_buffer));
+    ASSERT_TRUE(got_read_buffer);
     ReadValues(&stream, string_desc_, &results);
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
   }
@@ -860,7 +868,9 @@ TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
   for (int i = 0; i < 3; ++i) {
     bool delete_on_read = i == 2;
     vector<StringValue> results;
-    stream.PrepareForRead(delete_on_read);
+    bool got_read_buffer;
+    stream.PrepareForRead(delete_on_read, &got_read_buffer);
+    ASSERT_TRUE(got_read_buffer);
     ReadValues(&stream, string_desc_, &results);
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
   }
@@ -1021,7 +1031,9 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
   }
 
   // Read back and verify data.
-  stream.PrepareForRead(false);
+  bool got_read_buffer;
+  stream.PrepareForRead(false, &got_read_buffer);
+  ASSERT_TRUE(got_read_buffer);
   strings_index = 0;
   array_len_index = 0;
   bool eos = false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36c294b5/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index 6a487bc..86eedc4 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -398,7 +398,6 @@ Status BufferedTupleStream::PrepareForRead(bool 
delete_on_read, bool* got_buffer
       bool current_pinned;
       RETURN_IF_ERROR((*it)->Pin(&current_pinned));
       if (!current_pinned) {
-        DCHECK(got_buffer != NULL) << "Should have reserved enough blocks";
         *got_buffer = false;
         return Status::OK();
       }
@@ -419,7 +418,7 @@ Status BufferedTupleStream::PrepareForRead(bool 
delete_on_read, bool* got_buffer
   rows_returned_ = 0;
   read_block_idx_ = 0;
   delete_on_read_ = delete_on_read;
-  if (got_buffer != NULL) *got_buffer = true;
+  *got_buffer = true;
   return Status::OK();
 }
 
@@ -502,7 +501,9 @@ int BufferedTupleStream::ComputeNumNullIndicatorBytes(int 
block_size) const {
 Status BufferedTupleStream::GetRows(scoped_ptr<RowBatch>* batch, bool* 
got_rows) {
   RETURN_IF_ERROR(PinStream(false, got_rows));
   if (!*got_rows) return Status::OK();
-  RETURN_IF_ERROR(PrepareForRead(false));
+  bool got_read_buffer;
+  RETURN_IF_ERROR(PrepareForRead(false, &got_read_buffer));
+  DCHECK(got_read_buffer) << "Stream was pinned";
   batch->reset(
       new RowBatch(desc_, num_rows(), 
block_mgr_->get_tracker(block_mgr_client_)));
   bool eos = false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36c294b5/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h 
b/be/src/runtime/buffered-tuple-stream.h
index 75a69be..23c09a9 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -256,10 +256,9 @@ class BufferedTupleStream {
   /// begin reading. Otherwise this must be called after the last AddRow() and
   /// before GetNext().
   /// delete_on_read: Blocks are deleted after they are read.
-  /// If got_buffer is NULL, this function will fail (with a bad status) if no 
buffer
-  /// is available. If got_buffer is non-null, this function will not fail on 
OOM and
-  /// *got_buffer is true if a buffer was pinned.
-  Status PrepareForRead(bool delete_on_read, bool* got_buffer = NULL);
+  /// got_buffer: set to true if the first read block was successfully pinned, 
or
+  ///     false if the block could not be pinned and no error was encountered.
+  Status PrepareForRead(bool delete_on_read, bool* got_buffer);
 
   /// Pins all blocks in this stream and switches to pinned mode.
   /// If there is not enough memory, *pinned is set to false and the stream is 
unmodified.

Reply via email to