Repository: incubator-impala
Updated Branches:
  refs/heads/master 4592ed445 -> f11181cbe


IMPALA-2758: Remove BufferedTupleStream::GetRows

This patch removes BufferedTupleStream::GetRows. This function pins a
stream and reads all the rows into a single batch. It is not a good API
since it creates an arbitrarily large row batch. In this patch the call
sites pin the stream and then directly use GetNext to retrieve a single
batch at a time.

Testing: It passes existing tests. A test case for GetRows is removed.

Change-Id: I3831c38994da2b69775a9809ff01de5d23584414
Reviewed-on: http://gerrit.cloudera.org:8080/8226
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/30629fde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/30629fde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/30629fde

Branch: refs/heads/master
Commit: 30629fdea555e1a243106380df22dce2ef1ce942
Parents: 4592ed4
Author: Tianyi Wang <[email protected]>
Authored: Tue Oct 3 17:09:22 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Oct 27 22:15:47 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-node.cc    | 93 +++++++++++++----------
 be/src/exec/partitioned-hash-join-node.h     |  5 +-
 be/src/runtime/buffered-tuple-stream-test.cc | 17 -----
 be/src/runtime/buffered-tuple-stream.cc      | 24 ------
 be/src/runtime/buffered-tuple-stream.h       | 23 ++----
 5 files changed, 63 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/30629fde/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc 
b/be/src/exec/partitioned-hash-join-node.cc
index 94b49b3..77ed16b 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -64,6 +64,7 @@ PartitionedHashJoinNode::PartitionedHashJoinNode(
     num_probe_rows_partitioned_(NULL),
     null_aware_eval_timer_(NULL),
     state_(PARTITIONING_BUILD),
+    output_null_aware_probe_rows_running_(false),
     null_probe_output_idx_(-1),
     process_probe_batch_fn_(NULL),
     process_probe_batch_fn_level0_(NULL) {
@@ -210,7 +211,7 @@ Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_probe_output_idx_ = -1;
     matched_null_probe_.clear();
-    nulls_build_batch_.reset();
+    output_null_aware_probe_rows_running_ = false;
   }
   state_ = PARTITIONING_BUILD;
   ht_ctx_->set_level(0);
@@ -251,7 +252,7 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (ht_ctx_ != nullptr) ht_ctx_->Close(state);
   ht_ctx_.reset();
-  nulls_build_batch_.reset();
+  output_null_aware_probe_rows_running_ = false;
   output_unmatched_batch_.reset();
   output_unmatched_batch_iter_.reset();
   CloseAndDeletePartitions();
@@ -542,7 +543,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* 
state, RowBatch* out_batch
         continue;
       }
 
-      if (nulls_build_batch_ != NULL) {
+      if (output_null_aware_probe_rows_running_) {
         RETURN_IF_ERROR(OutputNullAwareProbeRows(state, out_batch));
         if (out_batch->AtCapacity()) break;
         continue;
@@ -783,7 +784,7 @@ Status 
PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state,
     RowBatch* out_batch) {
   DCHECK(builder_->null_aware_partition() != NULL);
   DCHECK(null_aware_probe_partition_ != NULL);
-  DCHECK(nulls_build_batch_ == NULL);
+  DCHECK(!output_null_aware_probe_rows_running_);
   DCHECK_NE(probe_batch_pos_, -1);
 
   if (probe_batch_pos_ == probe_batch_->num_rows()) {
@@ -873,7 +874,7 @@ Status PartitionedHashJoinNode::InitNullProbeRows() {
 Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
   DCHECK(builder_->null_aware_partition() != NULL);
   DCHECK(null_aware_probe_partition_ != NULL);
-  DCHECK(nulls_build_batch_ == NULL);
+  DCHECK(!output_null_aware_probe_rows_running_);
   DCHECK_EQ(probe_batch_pos_, -1);
   DCHECK_EQ(probe_batch_->num_rows(), 0);
 
@@ -884,15 +885,15 @@ Status 
PartitionedHashJoinNode::PrepareNullAwarePartition() {
     // There were no build rows. Nothing to do. Just prepare to output the null
     // probe rows.
     DCHECK_EQ(probe_stream->num_rows(), 0);
-    nulls_build_batch_.reset();
+    output_null_aware_probe_rows_running_ = false;
     RETURN_IF_ERROR(PrepareNullAwareNullProbe());
     return Status::OK();
   }
 
-  // Bring the entire spilled build stream into memory and read into a single 
batch.
-  bool got_rows;
-  RETURN_IF_ERROR(build_stream->GetRows(mem_tracker(), &nulls_build_batch_, 
&got_rows));
-  if (!got_rows) return NullAwareAntiJoinError(build_stream);
+  output_null_aware_probe_rows_running_ = true;
+  bool pinned;
+  RETURN_IF_ERROR(build_stream->PinStream(&pinned));
+  if (!pinned) return NullAwareAntiJoinError(build_stream);
 
   // Initialize the streams for read.
   bool got_read_buffer;
@@ -909,7 +910,7 @@ Status 
PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
     RowBatch* out_batch) {
   DCHECK(builder_->null_aware_partition() != NULL);
   DCHECK(null_aware_probe_partition_ != NULL);
-  DCHECK(nulls_build_batch_ != NULL);
+  DCHECK(output_null_aware_probe_rows_running_);
 
   ScalarExprEvaluator* const* join_conjunct_evals = 
other_join_conjunct_evals_.data();
   int num_join_conjuncts = other_join_conjuncts_.size();
@@ -928,7 +929,7 @@ Status 
PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
     if (probe_batch_->num_rows() == 0) {
       RETURN_IF_ERROR(EvaluateNullProbe(
             state, builder_->null_aware_partition()->build_rows()));
-      nulls_build_batch_.reset();
+      output_null_aware_probe_rows_running_ = false;
       RETURN_IF_ERROR(PrepareNullAwareNullProbe());
       return Status::OK();
     }
@@ -940,15 +941,26 @@ Status 
PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
     if (out_batch->AtCapacity()) break;
     TupleRow* probe_row = probe_batch_->GetRow(probe_batch_pos_);
     bool matched = false;
-    for (int i = 0; i < nulls_build_batch_->num_rows(); ++i) {
-      CreateOutputRow(semi_join_staging_row_, probe_row, 
nulls_build_batch_->GetRow(i));
-      if (ExecNode::EvalConjuncts(
-              join_conjunct_evals, num_join_conjuncts, 
semi_join_staging_row_)) {
-        matched = true;
-        break;
+    bool got_reservation;
+    BufferedTupleStream* null_build_stream =
+        builder_->null_aware_partition()->build_rows();
+    RETURN_IF_ERROR(null_build_stream->PrepareForRead(false, 
&got_reservation));
+    DCHECK(got_reservation) << "Should have been pinned";
+    RowBatch null_build_batch(child(1)->row_desc(), state->batch_size(), 
mem_tracker());
+    bool eos;
+    do {
+      RETURN_IF_ERROR(null_build_stream->GetNext(&null_build_batch, &eos));
+      FOREACH_ROW(&null_build_batch, 0, iter) {
+        CreateOutputRow(semi_join_staging_row_, probe_row, iter.Get());
+        if (ExecNode::EvalConjuncts(
+            join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) {
+          matched = true;
+          break;
+        }
       }
-    }
-
+      null_build_batch.Reset();
+      RETURN_IF_CANCELLED(state);
+    } while (!matched && !eos);
     if (!matched) {
       TupleRow* out_row = out_batch->GetRow(out_batch->AddRow());
       out_batch->CopyRow(probe_row, out_row);
@@ -1034,14 +1046,6 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(
     return Status::OK();
   }
   DCHECK_EQ(null_probe_rows_->num_rows(), matched_null_probe_.size());
-
-  // Bring the build side into memory, since we need to do a pass over it for
-  // every probe row.
-  bool got_rows;
-  scoped_ptr<RowBatch> build_rows;
-  RETURN_IF_ERROR(build->GetRows(mem_tracker(), &build_rows, &got_rows));
-  if (!got_rows) return NullAwareAntiJoinError(build);
-
   bool got_read_buffer;
   RETURN_IF_ERROR(null_probe_rows_->PrepareForRead(false, &got_read_buffer));
   DCHECK(got_read_buffer) << "Probe stream should always have a read or write 
iterator";
@@ -1049,6 +1053,11 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(
   ScalarExprEvaluator* const* join_conjunct_evals = 
other_join_conjunct_evals_.data();
   int num_join_conjuncts = other_join_conjuncts_.size();
   RowBatch probe_batch(child(0)->row_desc(), runtime_state_->batch_size(), 
mem_tracker());
+
+  bool pinned;
+  RETURN_IF_ERROR(build->PinStream(&pinned));
+  if (!pinned) return NullAwareAntiJoinError(build);
+
   // For each probe row, iterate over all rows in the build table.
   SCOPED_TIMER(null_aware_eval_timer_);
   int64_t probe_row_idx = 0;
@@ -1059,18 +1068,24 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(
       // This loop may run for a long time. Check for cancellation.
       RETURN_IF_CANCELLED(state);
       if (matched_null_probe_[probe_row_idx]) continue;
-      for (int j = 0; j < build_rows->num_rows(); ++j) {
-        // This loop may run for a long time if the number of build_rows is 
large.
-        // Periodically check for cancellation.
-        if (j % 1024 == 0) RETURN_IF_CANCELLED(state);
-        CreateOutputRow(
-            semi_join_staging_row_, probe_batch.GetRow(i), 
build_rows->GetRow(j));
-        if (ExecNode::EvalConjuncts(
-                join_conjunct_evals, num_join_conjuncts, 
semi_join_staging_row_)) {
-          matched_null_probe_[probe_row_idx] = true;
-          break;
+      bool got_reservation;
+      RETURN_IF_ERROR(build->PrepareForRead(false, &got_reservation));
+      DCHECK(got_reservation) << "Should have been pinned";
+      RowBatch build_batch(child(1)->row_desc(), state->batch_size(), 
mem_tracker());
+      bool build_eos;
+      do {
+        RETURN_IF_ERROR(build->GetNext(&build_batch, &build_eos));
+        FOREACH_ROW(&build_batch, 0, iter) {
+          CreateOutputRow(semi_join_staging_row_, probe_batch.GetRow(i), 
iter.Get());
+          if (ExecNode::EvalConjuncts(
+              join_conjunct_evals, num_join_conjuncts, 
semi_join_staging_row_)) {
+            matched_null_probe_[probe_row_idx] = true;
+            break;
+          }
         }
-      }
+        build_batch.Reset();
+        RETURN_IF_CANCELLED(state);
+      } while (!matched_null_probe_[probe_row_idx] && !build_eos);
     }
     probe_batch.Reset();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/30629fde/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h 
b/be/src/exec/partitioned-hash-join-node.h
index 572be34..a0c03ef 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -478,9 +478,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// This list is populated at CleanUpHashPartitions().
   std::list<PhjBuilder::Partition*> output_build_partitions_;
 
-  /// Used while processing null_aware_partition_. It contains all the build 
tuple rows
-  /// with a NULL when evaluating the hash table expr.
-  boost::scoped_ptr<RowBatch> nulls_build_batch_;
+  /// Whether this join is in a state outputting rows from 
OutputNullAwareProbeRows().
+  bool output_null_aware_probe_rows_running_;
 
   /// Partition used if 'null_aware_' is set. During probing, rows from the 
probe
   /// side that did not have a match in the hash table are appended to this 
partition.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/30629fde/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index 08ce7c3..ef66824 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -914,23 +914,6 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) {
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
-// Test for IMPALA-3923: overflow of 32-bit int in GetRows().
-TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) {
-  Init(BUFFER_POOL_LIMIT);
-  BufferedTupleStream stream(runtime_state_, int_desc_, &client_, PAGE_LEN, 
PAGE_LEN);
-  ASSERT_OK(stream.Init(-1, true));
-
-  Status status;
-  // Add more rows than can be fit in a RowBatch (limited by its 32-bit row 
count).
-  // Actually adding the rows would take a very long time, so just set 
num_rows_.
-  // This puts the stream in an inconsistent state, but exercises the right 
code path.
-  stream.num_rows_ = 1L << 33;
-  bool got_rows;
-  scoped_ptr<RowBatch> overflow_batch;
-  ASSERT_FALSE(stream.GetRows(&tracker_, &overflow_batch, &got_rows).ok());
-  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-}
-
 // Test rows greater than the default page size. Also exercise the read/write
 // mode with large pages.
 TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/30629fde/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index e0a14bb..f5668c7 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -677,30 +677,6 @@ void BufferedTupleStream::UnpinStream(UnpinMode mode) {
   CHECK_CONSISTENCY_FULL();
 }
 
-Status BufferedTupleStream::GetRows(
-    MemTracker* tracker, scoped_ptr<RowBatch>* batch, bool* got_rows) {
-  if (num_rows() > numeric_limits<int>::max()) {
-    // RowBatch::num_rows_ is a 32-bit int, avoid an overflow.
-    return Status(Substitute("Trying to read $0 rows into in-memory batch 
failed. Limit "
-                             "is $1",
-        num_rows(), numeric_limits<int>::max()));
-  }
-  RETURN_IF_ERROR(PinStream(got_rows));
-  if (!*got_rows) return Status::OK();
-  bool got_reservation;
-  RETURN_IF_ERROR(PrepareForRead(false, &got_reservation));
-  DCHECK(got_reservation) << "Stream was pinned";
-  batch->reset(new RowBatch(desc_, num_rows(), tracker));
-  bool eos = false;
-  // Loop until GetNext fills the entire batch. Each call can stop at page
-  // boundaries. We generally want it to stop, so that pages can be freed
-  // as we read. It is safe in this case because we pin the entire stream.
-  while (!eos) {
-    RETURN_IF_ERROR(GetNext(batch->get(), &eos));
-  }
-  return Status::OK();
-}
-
 Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos) {
   return GetNextInternal<false>(batch, eos, nullptr);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/30629fde/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h 
b/be/src/runtime/buffered-tuple-stream.h
index dbf3faf..565b5fa 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -55,8 +55,7 @@ class TupleRow;
 /// To use write-only mode, PrepareForWrite() is called once and 
AddRow()/AddRowCustom*()
 /// are called repeatedly to initialize then advance a write iterator through 
the stream.
 /// Once the stream is fully written, it can be read back by calling 
PrepareForRead()
-/// then GetNext() repeatedly to advance a read iterator through the stream, 
or by
-/// calling GetRows() to get all of the rows at once.
+/// then GetNext() repeatedly to advance a read iterator through the stream.
 ///
 /// To use read/write mode, PrepareForReadWrite() is called once to initialize 
the read
 /// and write iterators. AddRow()/AddRowCustom*() then advance a write 
iterator through
@@ -124,7 +123,7 @@ class TupleRow;
 /// the tuple to be valid, we only need to update pointers to point to the var 
len data
 /// in the stream. These pointers need to be updated by the stream because a 
spilled
 /// page's data may be relocated to a different buffer. The pointers are 
updated lazily
-/// upon reading the stream via GetNext() or GetRows().
+/// upon reading the stream via GetNext().
 ///
 /// Example layout for a row with two non-nullable tuples ((1, "hello"), (2, 
"world"))
 /// with all var len data stored in the stream:
@@ -181,10 +180,10 @@ class TupleRow;
 ///
 /// Memory lifetime of rows read from stream:
 /// If the stream is pinned and delete on read is false, it is valid to access 
any tuples
-/// returned via GetNext() or GetRows() until the stream is unpinned. If the 
stream is
-/// unpinned or delete on read is true, then the batch returned from GetNext() 
may have
-/// the needs_deep_copy flag set, which means that any tuple memory returned 
so far from
-/// the stream may be freed on the next call to GetNext().
+/// returned via GetNext() until the stream is unpinned. If the stream is 
unpinned or
+/// delete on read is true, then the batch returned from GetNext() may have the
+/// needs_deep_copy flag set, which means that any tuple memory returned so 
far from the
+/// stream may be freed on the next call to GetNext().
 /// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers 
to the batch.
 ///
 /// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd():
@@ -195,7 +194,7 @@ class TupleRow;
 /// AddRowCustomEnd() when done.
 ///
 /// If a caller constructs a tuple in this way, the caller can set the 
pointers and they
-/// will not be modified until the stream is read via GetNext() or GetRows().
+/// will not be modified until the stream is read via GetNext().
 /// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow().
 ///
 /// TODO: we need to be able to do read ahead for pages. We need some way to 
indicate a
@@ -332,13 +331,6 @@ class BufferedTupleStream {
   Status GetNext(
       RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows) 
WARN_UNUSED_RESULT;
 
-  /// Returns all the rows in the stream in batch. This pins the entire stream 
in the
-  /// process. If the current unused reservation is not sufficient to pin the 
stream in
-  /// memory, this will try to increase the reservation. If that fails, 
'got_rows' is set
-  /// to false.
-  Status GetRows(MemTracker* tracker, boost::scoped_ptr<RowBatch>* batch,
-      bool* got_rows) WARN_UNUSED_RESULT;
-
   /// Must be called once at the end to cleanup all resources. If 'batch' is 
non-NULL,
   /// attaches buffers from pinned pages that rows returned from GetNext() may 
reference.
   /// Otherwise deletes all pages. Does nothing if the stream was already 
closed. The
@@ -375,7 +367,6 @@ class BufferedTupleStream {
   friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
   friend class ArrayTupleStreamTest_TestComputeRowSize_Test;
   friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test;
-  friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test;
 
   /// Wrapper around BufferPool::PageHandle that tracks additional info about 
the page.
   struct Page {

Reply via email to