Repository: incubator-impala Updated Branches: refs/heads/master 4592ed445 -> f11181cbe
IMPALA-2758: Remove BufferedTupleStream::GetRows This patch removes BufferedTupleStream::GetRows. This function pins a stream and reads all the rows into a single batch. It is not a good API since it creates an arbitrarily large row batch. In this patch the call sites pin the stream and then directly use GetNext to retrieve a single batch at a time. Testing: It passes existing tests. A test case for GetRows is removed. Change-Id: I3831c38994da2b69775a9809ff01de5d23584414 Reviewed-on: http://gerrit.cloudera.org:8080/8226 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/30629fde Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/30629fde Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/30629fde Branch: refs/heads/master Commit: 30629fdea555e1a243106380df22dce2ef1ce942 Parents: 4592ed4 Author: Tianyi Wang <[email protected]> Authored: Tue Oct 3 17:09:22 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Oct 27 22:15:47 2017 +0000 ---------------------------------------------------------------------- be/src/exec/partitioned-hash-join-node.cc | 93 +++++++++++++---------- be/src/exec/partitioned-hash-join-node.h | 5 +- be/src/runtime/buffered-tuple-stream-test.cc | 17 ----- be/src/runtime/buffered-tuple-stream.cc | 24 ------ be/src/runtime/buffered-tuple-stream.h | 23 ++---- 5 files changed, 63 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/30629fde/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 94b49b3..77ed16b 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -64,6 +64,7 @@ PartitionedHashJoinNode::PartitionedHashJoinNode( num_probe_rows_partitioned_(NULL), null_aware_eval_timer_(NULL), state_(PARTITIONING_BUILD), + output_null_aware_probe_rows_running_(false), null_probe_output_idx_(-1), process_probe_batch_fn_(NULL), process_probe_batch_fn_level0_(NULL) { @@ -210,7 +211,7 @@ Status PartitionedHashJoinNode::Reset(RuntimeState* state) { if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { null_probe_output_idx_ = -1; matched_null_probe_.clear(); - nulls_build_batch_.reset(); + output_null_aware_probe_rows_running_ = false; } state_ = PARTITIONING_BUILD; ht_ctx_->set_level(0); @@ -251,7 +252,7 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) { if (is_closed()) return; if (ht_ctx_ != nullptr) ht_ctx_->Close(state); ht_ctx_.reset(); - nulls_build_batch_.reset(); + output_null_aware_probe_rows_running_ = false; output_unmatched_batch_.reset(); output_unmatched_batch_iter_.reset(); CloseAndDeletePartitions(); @@ -542,7 +543,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch continue; } - if (nulls_build_batch_ != NULL) { + if (output_null_aware_probe_rows_running_) { RETURN_IF_ERROR(OutputNullAwareProbeRows(state, out_batch)); if (out_batch->AtCapacity()) break; continue; @@ -783,7 +784,7 @@ Status PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state, RowBatch* out_batch) { DCHECK(builder_->null_aware_partition() != NULL); DCHECK(null_aware_probe_partition_ != NULL); - DCHECK(nulls_build_batch_ == NULL); + DCHECK(!output_null_aware_probe_rows_running_); DCHECK_NE(probe_batch_pos_, -1); if (probe_batch_pos_ == probe_batch_->num_rows()) { @@ -873,7 +874,7 @@ Status PartitionedHashJoinNode::InitNullProbeRows() { Status PartitionedHashJoinNode::PrepareNullAwarePartition() { DCHECK(builder_->null_aware_partition() != NULL); DCHECK(null_aware_probe_partition_ != NULL); - DCHECK(nulls_build_batch_ == NULL); + DCHECK(!output_null_aware_probe_rows_running_); DCHECK_EQ(probe_batch_pos_, -1); DCHECK_EQ(probe_batch_->num_rows(), 0); @@ -884,15 +885,15 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() { // There were no build rows. Nothing to do. Just prepare to output the null // probe rows. DCHECK_EQ(probe_stream->num_rows(), 0); - nulls_build_batch_.reset(); + output_null_aware_probe_rows_running_ = false; RETURN_IF_ERROR(PrepareNullAwareNullProbe()); return Status::OK(); } - // Bring the entire spilled build stream into memory and read into a single batch. - bool got_rows; - RETURN_IF_ERROR(build_stream->GetRows(mem_tracker(), &nulls_build_batch_, &got_rows)); - if (!got_rows) return NullAwareAntiJoinError(build_stream); + output_null_aware_probe_rows_running_ = true; + bool pinned; + RETURN_IF_ERROR(build_stream->PinStream(&pinned)); + if (!pinned) return NullAwareAntiJoinError(build_stream); // Initialize the streams for read. bool got_read_buffer; @@ -909,7 +910,7 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state, RowBatch* out_batch) { DCHECK(builder_->null_aware_partition() != NULL); DCHECK(null_aware_probe_partition_ != NULL); - DCHECK(nulls_build_batch_ != NULL); + DCHECK(output_null_aware_probe_rows_running_); ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data(); int num_join_conjuncts = other_join_conjuncts_.size(); @@ -928,7 +929,7 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state, if (probe_batch_->num_rows() == 0) { RETURN_IF_ERROR(EvaluateNullProbe( state, builder_->null_aware_partition()->build_rows())); - nulls_build_batch_.reset(); + output_null_aware_probe_rows_running_ = false; RETURN_IF_ERROR(PrepareNullAwareNullProbe()); return Status::OK(); } @@ -940,15 +941,26 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state, if (out_batch->AtCapacity()) break; TupleRow* probe_row = probe_batch_->GetRow(probe_batch_pos_); bool matched = false; - for (int i = 0; i < nulls_build_batch_->num_rows(); ++i) { - CreateOutputRow(semi_join_staging_row_, probe_row, nulls_build_batch_->GetRow(i)); - if (ExecNode::EvalConjuncts( - join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { - matched = true; - break; + bool got_reservation; + BufferedTupleStream* null_build_stream = + builder_->null_aware_partition()->build_rows(); + RETURN_IF_ERROR(null_build_stream->PrepareForRead(false, &got_reservation)); + DCHECK(got_reservation) << "Should have been pinned"; + RowBatch null_build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker()); + bool eos; + do { + RETURN_IF_ERROR(null_build_stream->GetNext(&null_build_batch, &eos)); + FOREACH_ROW(&null_build_batch, 0, iter) { + CreateOutputRow(semi_join_staging_row_, probe_row, iter.Get()); + if (ExecNode::EvalConjuncts( + join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { + matched = true; + break; + } } - } - + null_build_batch.Reset(); + RETURN_IF_CANCELLED(state); + } while (!matched && !eos); if (!matched) { TupleRow* out_row = out_batch->GetRow(out_batch->AddRow()); out_batch->CopyRow(probe_row, out_row); @@ -1034,14 +1046,6 @@ Status PartitionedHashJoinNode::EvaluateNullProbe( return Status::OK(); } DCHECK_EQ(null_probe_rows_->num_rows(), matched_null_probe_.size()); - - // Bring the build side into memory, since we need to do a pass over it for - // every probe row. - bool got_rows; - scoped_ptr<RowBatch> build_rows; - RETURN_IF_ERROR(build->GetRows(mem_tracker(), &build_rows, &got_rows)); - if (!got_rows) return NullAwareAntiJoinError(build); - bool got_read_buffer; RETURN_IF_ERROR(null_probe_rows_->PrepareForRead(false, &got_read_buffer)); DCHECK(got_read_buffer) << "Probe stream should always have a read or write iterator"; @@ -1049,6 +1053,11 @@ Status PartitionedHashJoinNode::EvaluateNullProbe( ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data(); int num_join_conjuncts = other_join_conjuncts_.size(); RowBatch probe_batch(child(0)->row_desc(), runtime_state_->batch_size(), mem_tracker()); + + bool pinned; + RETURN_IF_ERROR(build->PinStream(&pinned)); + if (!pinned) return NullAwareAntiJoinError(build); + // For each probe row, iterate over all rows in the build table. SCOPED_TIMER(null_aware_eval_timer_); int64_t probe_row_idx = 0; @@ -1059,18 +1068,24 @@ Status PartitionedHashJoinNode::EvaluateNullProbe( // This loop may run for a long time. Check for cancellation. RETURN_IF_CANCELLED(state); if (matched_null_probe_[probe_row_idx]) continue; - for (int j = 0; j < build_rows->num_rows(); ++j) { - // This loop may run for a long time if the number of build_rows is large. - // Periodically check for cancellation. - if (j % 1024 == 0) RETURN_IF_CANCELLED(state); - CreateOutputRow( - semi_join_staging_row_, probe_batch.GetRow(i), build_rows->GetRow(j)); - if (ExecNode::EvalConjuncts( - join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { - matched_null_probe_[probe_row_idx] = true; - break; + bool got_reservation; + RETURN_IF_ERROR(build->PrepareForRead(false, &got_reservation)); + DCHECK(got_reservation) << "Should have been pinned"; + RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker()); + bool build_eos; + do { + RETURN_IF_ERROR(build->GetNext(&build_batch, &build_eos)); + FOREACH_ROW(&build_batch, 0, iter) { + CreateOutputRow(semi_join_staging_row_, probe_batch.GetRow(i), iter.Get()); + if (ExecNode::EvalConjuncts( + join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { + matched_null_probe_[probe_row_idx] = true; + break; + } } - } + build_batch.Reset(); + RETURN_IF_CANCELLED(state); + } while (!matched_null_probe_[probe_row_idx] && !build_eos); } probe_batch.Reset(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/30629fde/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index 572be34..a0c03ef 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -478,9 +478,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// This list is populated at CleanUpHashPartitions(). std::list<PhjBuilder::Partition*> output_build_partitions_; - /// Used while processing null_aware_partition_. It contains all the build tuple rows - /// with a NULL when evaluating the hash table expr. - boost::scoped_ptr<RowBatch> nulls_build_batch_; + /// Whether this join is in a state outputting rows from OutputNullAwareProbeRows(). + bool output_null_aware_probe_rows_running_; /// Partition used if 'null_aware_' is set. During probing, rows from the probe /// side that did not have a match in the hash table are appended to this partition. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/30629fde/be/src/runtime/buffered-tuple-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc index 08ce7c3..ef66824 100644 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ b/be/src/runtime/buffered-tuple-stream-test.cc @@ -914,23 +914,6 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) { stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } -// Test for IMPALA-3923: overflow of 32-bit int in GetRows(). -TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) { - Init(BUFFER_POOL_LIMIT); - BufferedTupleStream stream(runtime_state_, int_desc_, &client_, PAGE_LEN, PAGE_LEN); - ASSERT_OK(stream.Init(-1, true)); - - Status status; - // Add more rows than can be fit in a RowBatch (limited by its 32-bit row count). - // Actually adding the rows would take a very long time, so just set num_rows_. - // This puts the stream in an inconsistent state, but exercises the right code path. - stream.num_rows_ = 1L << 33; - bool got_rows; - scoped_ptr<RowBatch> overflow_batch; - ASSERT_FALSE(stream.GetRows(&tracker_, &overflow_batch, &got_rows).ok()); - stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); -} - // Test rows greater than the default page size. Also exercise the read/write // mode with large pages. TEST_F(SimpleTupleStreamTest, BigStringReadWrite) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/30629fde/be/src/runtime/buffered-tuple-stream.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc index e0a14bb..f5668c7 100644 --- a/be/src/runtime/buffered-tuple-stream.cc +++ b/be/src/runtime/buffered-tuple-stream.cc @@ -677,30 +677,6 @@ void BufferedTupleStream::UnpinStream(UnpinMode mode) { CHECK_CONSISTENCY_FULL(); } -Status BufferedTupleStream::GetRows( - MemTracker* tracker, scoped_ptr<RowBatch>* batch, bool* got_rows) { - if (num_rows() > numeric_limits<int>::max()) { - // RowBatch::num_rows_ is a 32-bit int, avoid an overflow. - return Status(Substitute("Trying to read $0 rows into in-memory batch failed. Limit " - "is $1", - num_rows(), numeric_limits<int>::max())); - } - RETURN_IF_ERROR(PinStream(got_rows)); - if (!*got_rows) return Status::OK(); - bool got_reservation; - RETURN_IF_ERROR(PrepareForRead(false, &got_reservation)); - DCHECK(got_reservation) << "Stream was pinned"; - batch->reset(new RowBatch(desc_, num_rows(), tracker)); - bool eos = false; - // Loop until GetNext fills the entire batch. Each call can stop at page - // boundaries. We generally want it to stop, so that pages can be freed - // as we read. It is safe in this case because we pin the entire stream. - while (!eos) { - RETURN_IF_ERROR(GetNext(batch->get(), &eos)); - } - return Status::OK(); -} - Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos) { return GetNextInternal<false>(batch, eos, nullptr); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/30629fde/be/src/runtime/buffered-tuple-stream.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h index dbf3faf..565b5fa 100644 --- a/be/src/runtime/buffered-tuple-stream.h +++ b/be/src/runtime/buffered-tuple-stream.h @@ -55,8 +55,7 @@ class TupleRow; /// To use write-only mode, PrepareForWrite() is called once and AddRow()/AddRowCustom*() /// are called repeatedly to initialize then advance a write iterator through the stream. /// Once the stream is fully written, it can be read back by calling PrepareForRead() -/// then GetNext() repeatedly to advance a read iterator through the stream, or by -/// calling GetRows() to get all of the rows at once. +/// then GetNext() repeatedly to advance a read iterator through the stream. /// /// To use read/write mode, PrepareForReadWrite() is called once to initialize the read /// and write iterators. AddRow()/AddRowCustom*() then advance a write iterator through @@ -124,7 +123,7 @@ class TupleRow; /// the tuple to be valid, we only need to update pointers to point to the var len data /// in the stream. These pointers need to be updated by the stream because a spilled /// page's data may be relocated to a different buffer. The pointers are updated lazily -/// upon reading the stream via GetNext() or GetRows(). +/// upon reading the stream via GetNext(). /// /// Example layout for a row with two non-nullable tuples ((1, "hello"), (2, "world")) /// with all var len data stored in the stream: @@ -181,10 +180,10 @@ class TupleRow; /// /// Memory lifetime of rows read from stream: /// If the stream is pinned and delete on read is false, it is valid to access any tuples -/// returned via GetNext() or GetRows() until the stream is unpinned. If the stream is -/// unpinned or delete on read is true, then the batch returned from GetNext() may have -/// the needs_deep_copy flag set, which means that any tuple memory returned so far from -/// the stream may be freed on the next call to GetNext(). +/// returned via GetNext() until the stream is unpinned. If the stream is unpinned or +/// delete on read is true, then the batch returned from GetNext() may have the +/// needs_deep_copy flag set, which means that any tuple memory returned so far from the +/// stream may be freed on the next call to GetNext(). /// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch. /// /// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd(): @@ -195,7 +194,7 @@ class TupleRow; /// AddRowCustomEnd() when done. /// /// If a caller constructs a tuple in this way, the caller can set the pointers and they -/// will not be modified until the stream is read via GetNext() or GetRows(). +/// will not be modified until the stream is read via GetNext(). /// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow(). /// /// TODO: we need to be able to do read ahead for pages. We need some way to indicate a @@ -332,13 +331,6 @@ class BufferedTupleStream { Status GetNext( RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows) WARN_UNUSED_RESULT; - /// Returns all the rows in the stream in batch. This pins the entire stream in the - /// process. If the current unused reservation is not sufficient to pin the stream in - /// memory, this will try to increase the reservation. If that fails, 'got_rows' is set - /// to false. - Status GetRows(MemTracker* tracker, boost::scoped_ptr<RowBatch>* batch, - bool* got_rows) WARN_UNUSED_RESULT; - /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL, /// attaches buffers from pinned pages that rows returned from GetNext() may reference. /// Otherwise deletes all pages. Does nothing if the stream was already closed. The @@ -375,7 +367,6 @@ class BufferedTupleStream { friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test; friend class ArrayTupleStreamTest_TestComputeRowSize_Test; friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test; - friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test; /// Wrapper around BufferPool::PageHandle that tracks additional info about the page. struct Page {
