Repository: incubator-impala Updated Branches: refs/heads/master c67b198a1 -> acaf8b9f0
IMPALA-5570: fix spilling null-aware anti join IMPALA-4672: Part 2 regressed NAAJ by tightening up the spilling invariants (e.g. can't unpin with spilling disabled) but we didn't have tests for spilling NAAJs that could detect the regression. This patch adds those tests, fixes the regressions, and improves NAAJ by reliably spilling the probe side and not trying to bring the whole probe side into memory. The changes are: * All null-aware streams start off in memory and are only unpinned if spilling is enabled. * The null-aware build partition can be spilled in the same way as hash partitions. * Probe streams are unpinned whenever there is memory pressure - if spilling is enabled and either a build partition is spilled or appending to the probe stream fails. * Spilled probe streams are not re-pinned in EvaluateNullProbe(). Instead we just iterate over the rows of the stream. Testing: Add query tests where the three different buckets of rows are large enough to spill: the build and probe of the null-aware partition and the null probe rows. Test both spilling and in-memory (with spilling disabled) cases. Change-Id: Ie2e60eb4dd32bd287a31479a6232400df65964c1 Reviewed-on: http://gerrit.cloudera.org:8080/7367 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/acaf8b9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/acaf8b9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/acaf8b9f Branch: refs/heads/master Commit: acaf8b9f0cb7ea94b1c1ab8cd6a2f7c89d8be2e4 Parents: c67b198 Author: Tim Armstrong <[email protected]> Authored: Mon Jun 26 20:50:52 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Aug 24 04:24:10 2017 +0000 ---------------------------------------------------------------------- be/src/exec/partitioned-hash-join-builder.cc | 110 +++++++++------- be/src/exec/partitioned-hash-join-builder.h | 21 ++- be/src/exec/partitioned-hash-join-node-ir.cc | 11 +- be/src/exec/partitioned-hash-join-node.cc | 128 +++++++++++++------ be/src/exec/partitioned-hash-join-node.h | 19 +++ .../spilling-naaj-no-deny-reservation.test | 122 ++++++++++++++++++ .../queries/QueryTest/spilling-naaj.test | 102 +++++++++++++++ tests/query_test/test_spilling.py | 13 +- 8 files changed, 435 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/acaf8b9f/be/src/exec/partitioned-hash-join-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc index 46afb33..0c04541 100644 --- a/be/src/exec/partitioned-hash-join-builder.cc +++ b/be/src/exec/partitioned-hash-join-builder.cc @@ -235,6 +235,9 @@ Status PhjBuilder::FlushFinal(RuntimeState* state) { << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl << " #rows:" << partition->build_rows()->num_rows() << endl; } + if (null_aware_partition_ != nullptr) { + ss << " Null-aware partition: " << null_aware_partition_->DebugString(); + } VLOG(2) << ss.str(); } @@ -243,6 +246,12 @@ Status PhjBuilder::FlushFinal(RuntimeState* state) { non_empty_build_ |= (num_build_rows > 0); } + if (null_aware_partition_ != nullptr && null_aware_partition_->is_spilled()) { + // Free up memory for the hash tables of other partitions by unpinning the + // last block of the null aware partition's stream. + RETURN_IF_ERROR(null_aware_partition_->Spill(BufferedTupleStream::UNPIN_ALL)); + } + RETURN_IF_ERROR(BuildHashTablesAndPrepareProbeStreams()); return Status::OK(); } @@ -308,38 +317,42 @@ bool PhjBuilder::AppendRowStreamFull( } // TODO: can we do better with a different spilling heuristic? -Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) { +Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode, + Partition** spilled_partition) { DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT); - int64_t max_freed_mem = 0; - int partition_idx = -1; - - // Iterate over the partitions and pick the largest partition to spill. - for (int i = 0; i < PARTITION_FANOUT; ++i) { - Partition* candidate = hash_partitions_[i]; - if (candidate->IsClosed()) continue; - if (candidate->is_spilled()) continue; - int64_t mem = candidate->build_rows()->BytesPinned(false); - if (candidate->hash_tbl() != NULL) { - // The hash table should not have matches, since we have not probed it yet. - // Losing match info would lead to incorrect results (IMPALA-1488). - DCHECK(!candidate->hash_tbl()->HasMatches()); - mem += candidate->hash_tbl()->ByteSize(); - } - if (mem > max_freed_mem) { - max_freed_mem = mem; - partition_idx = i; + Partition* best_candidate = nullptr; + if (null_aware_partition_ != nullptr && null_aware_partition_->CanSpill()) { + // Spill null-aware partition first if possible - it is always processed last. + best_candidate = null_aware_partition_; + } else { + // Iterate over the partitions and pick the largest partition to spill. + int64_t max_freed_mem = 0; + for (Partition* candidate : hash_partitions_) { + if (!candidate->CanSpill()) continue; + int64_t mem = candidate->build_rows()->BytesPinned(false); + if (candidate->hash_tbl() != NULL) { + // The hash table should not have matches, since we have not probed it yet. + // Losing match info would lead to incorrect results (IMPALA-1488). + DCHECK(!candidate->hash_tbl()->HasMatches()); + mem += candidate->hash_tbl()->ByteSize(); + } + if (mem > max_freed_mem) { + max_freed_mem = mem; + best_candidate = candidate; + } } } - if (partition_idx == -1) { + if (best_candidate == nullptr) { return Status(Substitute("Internal error: could not find a partition to spill in " - " hash join $1: \n$2\nClient:\n$3", + " hash join $0: \n$1\nClient:\n$2", join_node_id_, DebugString(), buffer_pool_client_->DebugString())); } - VLOG(2) << "Spilling partition: " << partition_idx << endl << DebugString(); - Partition* build_partition = hash_partitions_[partition_idx]; - RETURN_IF_ERROR(build_partition->Spill(mode)); + VLOG(2) << "Spilling partition: " << best_candidate->DebugString() << endl + << DebugString(); + RETURN_IF_ERROR(best_candidate->Spill(mode)); + if (spilled_partition != nullptr) *spilled_partition = best_candidate; return Status::OK(); } @@ -436,8 +449,11 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() { RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer)); if (got_buffer) break; - RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL)); - ++probe_streams_to_create; + Partition* spilled_partition; + RETURN_IF_ERROR(SpillPartition( + BufferedTupleStream::UNPIN_ALL, &spilled_partition)); + // Don't need to create a probe stream for the null-aware partition. + if (spilled_partition != null_aware_partition_) ++probe_streams_to_create; } --probe_streams_to_create; } @@ -705,6 +721,27 @@ not_built: return status; } +std::string PhjBuilder::Partition::DebugString() { + stringstream ss; + ss << "<Partition>: ptr=" << this; + if (IsClosed()) { + ss << " Closed"; + return ss.str(); + } + if (is_spilled()) { + ss << " Spilled"; + } + DCHECK(build_rows() != nullptr); + ss << endl + << " Build Rows: " << build_rows_->num_rows() + << " (Bytes pinned: " << build_rows_->BytesPinned(false) << ")" + << endl; + if (hash_tbl_ != NULL) { + ss << " Hash Table Rows: " << hash_tbl_->size(); + } + return ss.str(); +} + void PhjBuilder::Codegen(LlvmCodeGen* codegen) { Status build_codegen_status; Status insert_codegen_status; @@ -739,23 +776,10 @@ string PhjBuilder::DebugString() const { stringstream ss; ss << "Hash partitions: " << hash_partitions_.size() << ":" << endl; for (int i = 0; i < hash_partitions_.size(); ++i) { - Partition* partition = hash_partitions_[i]; - ss << " Hash partition " << i << " ptr=" << partition; - if (partition->IsClosed()) { - ss << " Closed"; - continue; - } - if (partition->is_spilled()) { - ss << " Spilled"; - } - DCHECK(partition->build_rows() != NULL); - ss << endl - << " Build Rows: " << partition->build_rows()->num_rows() - << " (Bytes pinned: " << partition->build_rows()->BytesPinned(false) << ")" - << endl; - if (partition->hash_tbl() != NULL) { - ss << " Hash Table Rows: " << partition->hash_tbl()->size() << endl; - } + ss << " Hash partition " << i << " " << hash_partitions_[i]->DebugString() << endl; + } + if (null_aware_partition_ != nullptr) { + ss << "Null-aware partition: " << null_aware_partition_->DebugString(); } return ss.str(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/acaf8b9f/be/src/exec/partitioned-hash-join-builder.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h index 51bcd63..eba9e9f 100644 --- a/be/src/exec/partitioned-hash-join-builder.h +++ b/be/src/exec/partitioned-hash-join-builder.h @@ -201,14 +201,25 @@ class PhjBuilder : public DataSink { Status BuildHashTable(bool* built) WARN_UNUSED_RESULT; /// Spills this partition, the partition's stream is unpinned with 'mode' and - /// its hash table is destroyed if it was built. + /// its hash table is destroyed if it was built. Calling with 'mode' UNPIN_ALL + /// unpins all pages and frees all buffers associated with the partition so that + /// the partition does not use any reservation. Calling with 'mode' + /// UNPIN_ALL_EXCEPT_CURRENT may leave the read or write pages of the unpinned stream + /// pinned and therefore using reservation. If the partition was previously + /// spilled with mode UNPIN_ALL_EXCEPT_CURRENT, then calling Spill() again with + /// UNPIN_ALL may release more reservation by unpinning the read or write page + /// in the stream. Status Spill(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT; + std::string DebugString(); + bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; } BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); } HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); } bool ALWAYS_INLINE is_spilled() const { return is_spilled_; } int ALWAYS_INLINE level() const { return level_; } + /// Return true if the partition can be spilled - is not closed and is not spilled. + bool CanSpill() const { return !IsClosed() && !is_spilled(); } private: /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'flat_rows' is an array @@ -301,8 +312,11 @@ class PhjBuilder : public DataSink { /// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed /// to the Spill() call for the selected partition. The current policy is to spill the - /// largest partition. Returns non-ok status if we couldn't spill a partition. - Status SpillPartition(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT; + /// null-aware partition first (if a NAAJ), then the largest partition. Returns non-ok + /// status if we couldn't spill a partition. If 'spilled_partition' is non-NULL, set + /// to the partition that was the one spilled. + Status SpillPartition(BufferedTupleStream::UnpinMode mode, + Partition** spilled_partition = nullptr) WARN_UNUSED_RESULT; /// Tries to build hash tables for all unspilled hash partitions. Called after /// FlushFinal() when all build rows have been partitioned and added to the appropriate @@ -452,6 +466,7 @@ class PhjBuilder : public DataSink { /// store all the rows for which 'build_expr_evals_' evaluated over the row returns /// NULL (i.e. it has a NULL on the eq join slot). /// NULL if the join is not null aware or we are done processing this partition. + /// This partitions starts off in memory but can be spilled. Partition* null_aware_partition_; /// Populated during the hash table building phase if any partitions spilled. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/acaf8b9f/be/src/exec/partitioned-hash-join-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc index 3106419..a2a63c1 100644 --- a/be/src/exec/partitioned-hash-join-node-ir.cc +++ b/be/src/exec/partitioned-hash-join-node-ir.cc @@ -314,7 +314,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow( // Skip the current row if we manage to append to the spilled partition's BTS. // Otherwise, we need to bail out and report the failure. BufferedTupleStream* probe_rows = probe_partition->probe_rows(); - if (UNLIKELY(!AppendProbeRow(probe_rows, current_probe_row_, status))) { + if (UNLIKELY(!AppendSpilledProbeRow(probe_rows, current_probe_row_, status))) { DCHECK(!status->ok()); return false; } @@ -437,13 +437,20 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode return num_rows_added; } -inline bool PartitionedHashJoinNode::AppendProbeRow( +inline bool PartitionedHashJoinNode::AppendSpilledProbeRow( BufferedTupleStream* stream, TupleRow* row, Status* status) { DCHECK(stream->has_write_iterator()); DCHECK(!stream->is_pinned()); return stream->AddRow(row, status); } +inline bool PartitionedHashJoinNode::AppendProbeRow( + BufferedTupleStream* stream, TupleRow* row, Status* status) { + DCHECK(stream->has_write_iterator()); + if (LIKELY(stream->AddRow(row, status))) return true; + return AppendProbeRowSlow(stream, row, status); // Don't cross-compile the slow path. +} + template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>( TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/acaf8b9f/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 2a1544c..4faa12e 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -24,14 +24,15 @@ #include "codegen/llvm-codegen.h" #include "exec/hash-table.inline.h" -#include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" +#include "exprs/scalar-expr.h" #include "exprs/slot-ref.h" #include "runtime/buffered-tuple-stream.inline.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "util/debug-util.h" +#include "util/pretty-printer.h" #include "util/runtime-profile-counters.h" #include "gen-cpp/PlanNodes_types.h" @@ -268,7 +269,6 @@ PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state, : build_partition_(build_partition), probe_rows_(std::move(probe_rows)) { DCHECK(probe_rows_->has_write_iterator()); - DCHECK(!probe_rows_->is_pinned()); } PartitionedHashJoinNode::ProbePartition::~ProbePartition() { @@ -820,9 +820,15 @@ Status PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state, // effort, this would behave very slowly (we'd need to do IO for each row). This seems // like a reasonable limitation for now. // TODO: revisit. -static Status NullAwareAntiJoinError(bool build) { - return Status(Substitute("Unable to perform Null-Aware Anti-Join. There are too" - " many NULLs on the $0 side to perform this join.", build ? "build" : "probe")); +Status PartitionedHashJoinNode::NullAwareAntiJoinError(BufferedTupleStream* rows) { + return Status(Substitute( + "Unable to perform Null-Aware Anti-Join. Could not get enough reservation to fit " + "all rows with NULLs from the build side in memory. Memory required for $0 rows " + "was $1. $2/$3 of the join's reservation was available for the rows.", + rows->num_rows(), PrettyPrinter::PrintBytes(rows->byte_size()), + PrettyPrinter::PrintBytes( + buffer_pool_client_.GetUnusedReservation() + rows->BytesPinned(false)), + PrettyPrinter::PrintBytes(buffer_pool_client_.GetReservation()))); } Status PartitionedHashJoinNode::InitNullAwareProbePartition() { @@ -831,8 +837,7 @@ Status PartitionedHashJoinNode::InitNullAwareProbePartition() { state, child(0)->row_desc(), &buffer_pool_client_, resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size); - // TODO: this should be pinned if spilling is disabled. - Status status = probe_rows->Init(id(), false); + Status status = probe_rows->Init(id(), true); if (!status.ok()) goto error; bool got_buffer; status = probe_rows->PrepareForWrite(&got_buffer); @@ -855,8 +860,8 @@ Status PartitionedHashJoinNode::InitNullProbeRows() { null_probe_rows_ = make_unique<BufferedTupleStream>(state, child(0)->row_desc(), &buffer_pool_client_, resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size); - // TODO: we shouldn't start with this unpinned if spilling is disabled. - RETURN_IF_ERROR(null_probe_rows_->Init(id(), false)); + // Start with stream pinned, unpin later if needed. + RETURN_IF_ERROR(null_probe_rows_->Init(id(), true)); bool got_buffer; RETURN_IF_ERROR(null_probe_rows_->PrepareForWrite(&got_buffer)); DCHECK(got_buffer) @@ -886,7 +891,7 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() { // Bring the entire spilled build stream into memory and read into a single batch. bool got_rows; RETURN_IF_ERROR(build_stream->GetRows(mem_tracker(), &nulls_build_batch_, &got_rows)); - if (!got_rows) return NullAwareAntiJoinError(true); + if (!got_rows) return NullAwareAntiJoinError(build_stream); // Initialize the streams for read. bool got_read_buffer; @@ -960,16 +965,27 @@ Status PartitionedHashJoinNode::PrepareForProbe() { vector<unique_ptr<BufferedTupleStream>> probe_streams = builder_->TransferProbeStreams(); probe_hash_partitions_.resize(PARTITION_FANOUT); + bool have_spilled_hash_partitions = false; for (int i = 0; i < PARTITION_FANOUT; ++i) { PhjBuilder::Partition* build_partition = builder_->hash_partition(i); if (build_partition->IsClosed() || !build_partition->is_spilled()) continue; - + have_spilled_hash_partitions = true; DCHECK(!probe_streams.empty()) << "Builder should have created enough streams"; CreateProbePartition(i, std::move(probe_streams.back())); probe_streams.pop_back(); } DCHECK(probe_streams.empty()) << "Builder should not have created extra streams"; + // Unpin null-aware probe streams if any partitions spilled: we don't want to waste + // memory pinning the probe streams that might be needed to process spilled partitions. + if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN + && (have_spilled_hash_partitions + || builder_->null_aware_partition()->is_spilled())) { + null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); + null_aware_probe_partition_->probe_rows()->UnpinStream( + BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); + } + // Initialize the hash_tbl_ caching array. for (int i = 0; i < PARTITION_FANOUT; ++i) { hash_tbls_[i] = builder_->hash_partition(i)->hash_tbl(); @@ -1002,6 +1018,15 @@ void PartitionedHashJoinNode::CreateProbePartition( this, builder_->hash_partition(partition_idx), std::move(probe_rows)); } +bool PartitionedHashJoinNode::AppendProbeRowSlow( + BufferedTupleStream* stream, TupleRow* row, Status* status) { + if (!status->ok()) return false; // Check if AddRow() set status. + *status = runtime_state_->StartSpilling(mem_tracker()); + if (!status->ok()) return false; + stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); + return stream->AddRow(row, status); +} + Status PartitionedHashJoinNode::EvaluateNullProbe( RuntimeState* state, BufferedTupleStream* build) { if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) { @@ -1009,38 +1034,46 @@ Status PartitionedHashJoinNode::EvaluateNullProbe( } DCHECK_EQ(null_probe_rows_->num_rows(), matched_null_probe_.size()); - // Bring both the build and probe side into memory and do a pairwise evaluation. + // Bring the build side into memory, since we need to do a pass over it for + // every probe row. bool got_rows; scoped_ptr<RowBatch> build_rows; RETURN_IF_ERROR(build->GetRows(mem_tracker(), &build_rows, &got_rows)); - if (!got_rows) return NullAwareAntiJoinError(true); - scoped_ptr<RowBatch> probe_rows; - RETURN_IF_ERROR(null_probe_rows_->GetRows(mem_tracker(), &probe_rows, &got_rows)); - if (!got_rows) return NullAwareAntiJoinError(false); + if (!got_rows) return NullAwareAntiJoinError(build); + + bool got_read_buffer; + RETURN_IF_ERROR(null_probe_rows_->PrepareForRead(false, &got_read_buffer)); + DCHECK(got_read_buffer) << "Probe stream should always have a read or write iterator"; ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data(); int num_join_conjuncts = other_join_conjuncts_.size(); - DCHECK_LE(probe_rows->num_rows(), matched_null_probe_.size()); - // For each row, iterate over all rows in the build table. + RowBatch probe_batch(child(0)->row_desc(), runtime_state_->batch_size(), mem_tracker()); + // For each probe row, iterate over all rows in the build table. SCOPED_TIMER(null_aware_eval_timer_); - for (int i = 0; i < probe_rows->num_rows(); ++i) { - // This loop may run for a long time. Check for cancellation. - RETURN_IF_CANCELLED(state); - if (matched_null_probe_[i]) continue; - for (int j = 0; j < build_rows->num_rows(); ++j) { - // This loop may run for a long time if the number of build_rows is large. - // Periodically check for cancellation. - if (j % 1024 == 0) RETURN_IF_CANCELLED(state); - CreateOutputRow(semi_join_staging_row_, probe_rows->GetRow(i), - build_rows->GetRow(j)); - if (ExecNode::EvalConjuncts( - join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { - matched_null_probe_[i] = true; - break; + int64_t probe_row_idx = 0; + bool probe_stream_eos = false; + while (!probe_stream_eos) { + RETURN_IF_ERROR(null_probe_rows_->GetNext(&probe_batch, &probe_stream_eos)); + for (int i = 0; i < probe_batch.num_rows(); ++i, ++probe_row_idx) { + // This loop may run for a long time. Check for cancellation. + RETURN_IF_CANCELLED(state); + if (matched_null_probe_[probe_row_idx]) continue; + for (int j = 0; j < build_rows->num_rows(); ++j) { + // This loop may run for a long time if the number of build_rows is large. + // Periodically check for cancellation. + if (j % 1024 == 0) RETURN_IF_CANCELLED(state); + CreateOutputRow( + semi_join_staging_row_, probe_batch.GetRow(i), build_rows->GetRow(j)); + if (ExecNode::EvalConjuncts( + join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { + matched_null_probe_[probe_row_idx] = true; + break; + } } } + probe_batch.Reset(); } - + DCHECK_EQ(probe_row_idx, null_probe_rows_->num_rows()); return Status::OK(); } @@ -1110,12 +1143,6 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions( } } - // Just finished evaluating the null probe rows with all the non-spilled build - // partitions. Unpin this now to free this memory for repartitioning. - if (null_probe_rows_ != NULL) { - null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); - } - builder_->ClearHashPartitions(); probe_hash_partitions_.clear(); return Status::OK(); @@ -1178,7 +1205,8 @@ string PartitionedHashJoinNode::NodeDebugString() const { BufferedTupleStream* probe_rows = probe_partition->probe_rows(); if (probe_rows != NULL) { ss << " Probe Rows: " << probe_rows->num_rows() - << " (Bytes pinned: " << probe_rows->BytesPinned(false) << ")"; + << " (Bytes total/pinned: " << probe_rows->byte_size() << "/" + << probe_rows->BytesPinned(false) << ")"; } } ss << endl; @@ -1205,12 +1233,28 @@ string PartitionedHashJoinNode::NodeDebugString() const { if (build_partition->IsClosed()) { ss << " Build Partition Closed" << endl; } else { - ss << " Build Rows: " << build_partition->build_rows()->num_rows() << endl; + ss << " Spilled Build Rows: " << build_partition->build_rows()->num_rows() << endl; } - ss << " Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl; + ss << " Spilled Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl; } else { ss << "InputPartition: NULL" << endl; } + + if (null_aware_probe_partition_ != nullptr) { + ss << "null-aware probe partition ptr=" << null_aware_probe_partition_.get(); + BufferedTupleStream* probe_rows = null_aware_probe_partition_->probe_rows(); + if (probe_rows != NULL) { + ss << " Probe Rows: " << probe_rows->num_rows() + << " (Bytes total/pinned: " << probe_rows->byte_size() << "/" + << probe_rows->BytesPinned(false) << ")" << endl; + } + } + if (null_probe_rows_ != nullptr) { + ss << "null probe rows ptr=" << null_probe_rows_.get(); + ss << " Probe Rows: " << null_probe_rows_->num_rows() + << " (Bytes total/pinned: " << null_probe_rows_->byte_size() << "/" + << null_probe_rows_->BytesPinned(false) << ")" << endl; + } return ss.str(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/acaf8b9f/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index 6ed5269..cb51b9d 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -169,9 +169,20 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// Returns false and sets 'status' to an error if an error is encountered. This odd /// return convention is used to avoid emitting unnecessary code for ~Status in perf- /// critical code. + bool AppendSpilledProbeRow( + BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; + + /// Append the probe row 'row' to 'stream'. The stream may be pinned or unpinned and + /// and must have a write buffer allocated. Unpins the stream if needed to append the + /// row, so this will succeed unless an error is encountered. Returns false and sets + /// 'status' to an error if an error is encountered. This odd return convention is + /// used to avoid emitting unnecessary code for ~Status in perf-critical code. bool AppendProbeRow( BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; + /// Slow path for AppendProbeRow() where appending fails initially. + bool AppendProbeRowSlow(BufferedTupleStream* stream, TupleRow* row, Status* status); + /// Probes the hash table for rows matching the current probe row and appends /// all the matching build rows (with probe row) to output batch. Returns true /// if probing is done for the current probe row and should continue to next row. @@ -368,6 +379,10 @@ class PartitionedHashJoinNode : public BlockingJoinNode { Status PrepareSpilledPartitionForProbe( RuntimeState* state, bool* got_partition) WARN_UNUSED_RESULT; + /// Construct an error status for the null-aware anti-join when it could not fit 'rows' + /// from the build side in memory. + Status NullAwareAntiJoinError(BufferedTupleStream* rows); + /// Calls Close() on every probe partition, destroys the partitions and cleans up any /// references to the partitions. Also closes and destroys 'null_probe_rows_'. void CloseAndDeletePartitions(); @@ -468,10 +483,14 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// At the very end, we then iterate over the partition's probe rows. For each probe /// row, we return the rows that did not match any of the partition's build rows. This /// is NULL if this join is not null aware or we are done processing this partition. + /// The probe stream starts off in memory but is unpinned if there is memory pressure, + /// specifically if any partitions spilled or appending to the pinned stream failed. boost::scoped_ptr<ProbePartition> null_aware_probe_partition_; /// For NAAJ, this stream contains all probe rows that had NULL on the hash table /// conjuncts. Must be unique_ptr so we can release it and transfer to output batches. + /// The stream starts off in memory but is unpinned if there is memory pressure, + /// specifically if any partitions spilled or appending to the pinned stream failed. std::unique_ptr<BufferedTupleStream> null_probe_rows_; /// For each row in null_probe_rows_, true if this row has matched any build row http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/acaf8b9f/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj-no-deny-reservation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj-no-deny-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj-no-deny-reservation.test new file mode 100644 index 0000000..420e721 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj-no-deny-reservation.test @@ -0,0 +1,122 @@ +==== +---- QUERY +# Also see related tests in spilling-naaj.test +# +# ======================================================================================= +# NAAJ QUERY 1: many nulls on build side. +# ======================================================================================= +# The hash join must spill the null-aware build rows to make progress. +# This returns the same rows as: +# select * from lineitem +# where l_suppkey = 4162 and l_shipmode = 'AIR' +# and l_returnflag = 'A' and l_shipdate > '1993-01-01' +# except: +# * even-numbered l_orderkey values, because there is a NULL on the build side. +# * l_orderkey values > 5500000, because there is a NULL on the probe side +set buffer_pool_limit=12m; +select l_orderkey, l_partkey, l_suppkey, l_linenumber +from lineitem +where l_suppkey = 4162 and l_shipmode = 'AIR' and l_returnflag = 'A' and + l_shipdate > '1993-01-01' and + if(l_orderkey > 5500000, NULL, l_orderkey) not in ( + select if(o_orderkey % 2 = 0, NULL, o_orderkey + 1) + from orders + where l_orderkey = o_orderkey) +order by 1,2,3,4 +---- RESULTS +4013537,46649,4162,1 +4298819,16658,4162,2 +4431651,81653,4162,1 +5081287,119139,4162,1 +5187203,49153,4162,1 +---- TYPES +BIGINT,BIGINT,BIGINT,INT +==== +---- QUERY +set scratch_limit=0; +# Execute NAAJ query 1 with unlimited memory and spilling disabled. The query should succeed. +select l_orderkey, l_partkey, l_suppkey, l_linenumber +from lineitem +where l_suppkey = 4162 and l_shipmode = 'AIR' and l_returnflag = 'A' and + l_shipdate > '1993-01-01' and + if(l_orderkey > 5500000, NULL, l_orderkey) not in ( + select if(o_orderkey % 2 = 0, NULL, o_orderkey + 1) + from orders + where l_orderkey = o_orderkey) +order by 1,2,3,4 +---- RESULTS +4013537,46649,4162,1 +4298819,16658,4162,2 +4431651,81653,4162,1 +5081287,119139,4162,1 +5187203,49153,4162,1 +---- TYPES +BIGINT,BIGINT,BIGINT,INT +==== +---- QUERY +set buffer_pool_limit=7m; +# Execute NAAJ query 1 without enough memory to fit the null build rows in memory. +select l_orderkey, l_partkey, l_suppkey, l_linenumber +from lineitem +where l_suppkey = 4162 and l_shipmode = 'AIR' and l_returnflag = 'A' and + l_shipdate > '1993-01-01' and + if(l_orderkey > 5500000, NULL, l_orderkey) not in ( + select if(o_orderkey % 2 = 0, NULL, o_orderkey + 1) + from orders + where l_orderkey = o_orderkey) +order by 1,2,3,4 +---- CATCH +Unable to perform Null-Aware Anti-Join. Could not get enough reservation to fit all rows with NULLs from the build side in memory +==== +---- QUERY +# Execute NAAJ query 2 execute in-memory only. +set scratch_limit=0; +select l_orderkey, l_partkey, l_suppkey, l_linenumber +from lineitem +where if(l_orderkey % 2 = 0, NULL, l_partkey) not in ( + select p_partkey + from part + where p_retailprice != l_extendedprice * l_tax) + order by 1,2,3,4 limit 5 +---- RESULTS +965,107207,9718,1 +1351,107227,7228,1 +1505,122702,5215,2 +1601,174374,1926,2 +1767,22387,4890,4 +---- TYPES +BIGINT,BIGINT,BIGINT,INT +==== +---- QUERY +# Test that we can execute NAAJ query 3 with spilling disabled. +set scratch_limit=0; +select l_orderkey, l_partkey, l_suppkey, l_linenumber +from lineitem +where l_partkey not in ( + select if(p_partkey = 2, NULL, p_partkey) + from part + where p_partkey % 2 = 0 and p_retailprice != l_extendedprice * l_tax) +order by 1,2,3,4 +---- RESULTS +3178597,1001,3502,1 +4801283,199001,9002,1 +4958784,116009,1032,2 +---- TYPES +BIGINT,BIGINT,BIGINT,INT +==== +---- QUERY +# Test that we can execute NAAJ query 4 with spilling disabled. +set scratch_limit=0; +select l_orderkey, l_partkey, l_suppkey, l_linenumber +from lineitem +where if(l_partkey % 5 != 1, NULL, l_partkey) not in ( + select if(p_partkey = 2, NULL, p_partkey) + from part + where p_partkey % 2 = 0 and p_retailprice != l_extendedprice * l_tax) +order by 1,2,3,4 +---- RESULTS +3178597,1001,3502,1 +4801283,199001,9002,1 +---- TYPES +BIGINT,BIGINT,BIGINT,INT +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/acaf8b9f/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj.test new file mode 100644 index 0000000..4c0d790 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj.test @@ -0,0 +1,102 @@ +==== +---- QUERY +# Also see related tests in spilling-naaj-no-deny-reservation.test +# +# ======================================================================================= +# NAAJ QUERY 2: many probe rows with NULLs in the join key. +# ======================================================================================= +# This produces the same results as: +# select l_orderkey, l_partkey, l_suppkey, l_linenumber +# from lineitem +# where l_orderkey % 2 = 1 and l_partkey not in ( +# select p_partkey +# from part +# where p_retailprice != l_extendedprice * l_tax) +# order by 1,2,3,4 limit 5 +# +# Which produces the same results as: +# select l_orderkey, l_partkey, l_suppkey, l_linenumber +# from lineitem +# join part on l_partkey = p_partkey +# where l_orderkey % 2 = 1 and p_retailprice = l_extendedprice * l_tax +# order by 1,2,3,4 limit 5 +# +set buffer_pool_limit=10m; +select l_orderkey, l_partkey, l_suppkey, l_linenumber +from lineitem +where if(l_orderkey % 2 = 0, NULL, l_partkey) not in ( + select p_partkey + from part + where p_retailprice != l_extendedprice * l_tax) +order by 1,2,3,4 limit 5 +---- RESULTS +965,107207,9718,1 +1351,107227,7228,1 +1505,122702,5215,2 +1601,174374,1926,2 +1767,22387,4890,4 +---- TYPES +BIGINT,BIGINT,BIGINT,INT +==== +---- QUERY +# Execute NAAJ query 2 in-memory only without enough memory to complete. +set scratch_limit=0; +set buffer_pool_limit=10m; +select l_orderkey, l_partkey, l_suppkey, l_linenumber +from lineitem +where if(l_orderkey % 2 = 0, NULL, l_partkey) not in ( + select p_partkey + from part + where p_retailprice != l_extendedprice * l_tax) + order by 1,2,3,4 limit 5 +---- CATCH +Could not free memory by spilling to disk: scratch_limit is 0 +==== +---- QUERY +# ======================================================================================= +# NAAJ QUERY 3: many non-NULL probe rows that didn't match a build row. +# ======================================================================================= +# The correlated subquery includes a NULL when l_extended_price * l_tax != p_retailprice +# and all the even p_partkey values except 2. Thus this query returns the same results: +# +# select l_orderkey, l_partkey, l_suppkey, l_linenumber +# from lineitem +# join part on l_extendedprice * l_tax = p_retailprice +# where p_partkey = 2 and l_partkey % 2 = 1 +# order by 1,2,3,4 +# +set buffer_pool_limit=10m; +select l_orderkey, l_partkey, l_suppkey, l_linenumber +from lineitem +where l_partkey not in ( + select if(p_partkey = 2, NULL, p_partkey) + from part + where p_partkey % 2 = 0 and p_retailprice != l_extendedprice * l_tax) +order by 1,2,3,4 +---- RESULTS +3178597,1001,3502,1 +4801283,199001,9002,1 +4958784,116009,1032,2 +---- TYPES +BIGINT,BIGINT,BIGINT,INT +==== +---- QUERY +# ======================================================================================= +# NAAJ QUERY 4: many of both kinds of probe rows (NULL and non-NULL/unmatched) +# ======================================================================================= +# This returns one less row than the previous query because l_partkey=116009 is +# replaced with a NULL. +set buffer_pool_limit=10m; +select l_orderkey, l_partkey, l_suppkey, l_linenumber +from lineitem +where if(l_partkey % 5 != 1, NULL, l_partkey) not in ( + select if(p_partkey = 2, NULL, p_partkey) + from part + where p_partkey % 2 = 0 and p_retailprice != l_extendedprice * l_tax) +order by 1,2,3,4 +---- RESULTS +3178597,1001,3502,1 +4801283,199001,9002,1 +---- TYPES +BIGINT,BIGINT,BIGINT,INT +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/acaf8b9f/tests/query_test/test_spilling.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_spilling.py b/tests/query_test/test_spilling.py index 0ca0971..437739c 100644 --- a/tests/query_test/test_spilling.py +++ b/tests/query_test/test_spilling.py @@ -28,7 +28,6 @@ DEBUG_ACTION_DIMS = [None, '-1:OPEN:[email protected]', '-1:OPEN:[email protected]'] - @pytest.mark.xfail(pytest.config.option.testing_remote_cluster, reason='Queries may not spill on larger clusters') class TestSpilling(ImpalaTestSuite): @@ -57,6 +56,18 @@ class TestSpilling(ImpalaTestSuite): spilling to disk""" self.run_test_case('QueryTest/spilling-large-rows', vector, unique_database) + def test_spilling_naaj(self, vector): + """Test spilling null-aware anti-joins""" + self.run_test_case('QueryTest/spilling-naaj', vector) + + def test_spilling_naaj_no_deny_reservation(self, vector): + """ + Null-aware anti-join tests that depend on getting more than the minimum reservation + and therefore will not reliably pass with the deny reservation debug action enabled. + """ + if vector.get_value('exec_option')['debug_action'] is None: + self.run_test_case('QueryTest/spilling-naaj-no-deny-reservation', vector) + def test_spilling_sorts_exhaustive(self, vector): if self.exploration_strategy() != 'exhaustive': pytest.skip("only run large sorts on exhaustive")
