IMPALA-5586: Null-aware anti-join can take a long time to cancel Queries with a null-aware anti-join joining on a large number of NULLs can take a long time to cancel if threads are stuck in PartitionedHashJoinNode::EvaluateNullProbe(). This change adds the RETURN_IF_CANCELLED macro to the function.
Testing: Added logs to PartitionedHashJoinNode::EvaluateNullProbe() and made sure that the function returns right away on cancellation. Change-Id: I0800754d4ad31cbadbdfadc630c640963f3f6053 Reviewed-on: http://gerrit.cloudera.org:8080/7393 Tested-by: Impala Public Jenkins Reviewed-by: Tim Armstrong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7ccbfe47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7ccbfe47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7ccbfe47 Branch: refs/heads/master Commit: 7ccbfe47fe1a0b693f6d94ae4b1062a0e3b66e88 Parents: 2c0fc30 Author: aphadke <[email protected]> Authored: Mon Jul 10 17:37:31 2017 -0700 Committer: Tim Armstrong <[email protected]> Committed: Tue Jul 25 03:57:14 2017 +0000 ---------------------------------------------------------------------- be/src/exec/partitioned-hash-join-node.cc | 19 +++++++++++++------ be/src/exec/partitioned-hash-join-node.h | 5 +++-- 2 files changed, 16 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7ccbfe47/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 927bf1a..0f731d3 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -608,7 +608,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch // Finished up all probe rows for 'hash_partitions_'. We may have already cleaned up // the hash partitions, e.g. if we had to output some unmatched build rows below. if (builder_->num_hash_partitions() != 0) { - RETURN_IF_ERROR(CleanUpHashPartitions(out_batch)); + RETURN_IF_ERROR(CleanUpHashPartitions(state, out_batch)); if (out_batch->AtCapacity()) break; } @@ -914,7 +914,8 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state, RETURN_IF_ERROR(probe_stream->GetNext(probe_batch_.get(), &eos)); if (probe_batch_->num_rows() == 0) { - RETURN_IF_ERROR(EvaluateNullProbe(builder_->null_aware_partition()->build_rows())); + RETURN_IF_ERROR(EvaluateNullProbe( + state, builder_->null_aware_partition()->build_rows())); nulls_build_batch_.reset(); RETURN_IF_ERROR(PrepareNullAwareNullProbe()); return Status::OK(); @@ -994,7 +995,8 @@ void PartitionedHashJoinNode::CreateProbePartition( this, builder_->hash_partition(partition_idx), std::move(probe_rows)); } -Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) { +Status PartitionedHashJoinNode::EvaluateNullProbe( + RuntimeState* state, BufferedTupleStream* build) { if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) { return Status::OK(); } @@ -1011,13 +1013,17 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) { ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data(); int num_join_conjuncts = other_join_conjuncts_.size(); - DCHECK_LE(probe_rows->num_rows(), matched_null_probe_.size()); // For each row, iterate over all rows in the build table. SCOPED_TIMER(null_aware_eval_timer_); for (int i = 0; i < probe_rows->num_rows(); ++i) { + // This loop may run for a long time. Check for cancellation. + RETURN_IF_CANCELLED(state); if (matched_null_probe_[i]) continue; for (int j = 0; j < build_rows->num_rows(); ++j) { + // This loop may run for a long time if the number of build_rows is large. + // Periodically check for cancellation. + if (j % 1024 == 0) RETURN_IF_CANCELLED(state); CreateOutputRow(semi_join_staging_row_, probe_rows->GetRow(i), build_rows->GetRow(j)); if (ExecNode::EvalConjuncts( @@ -1031,7 +1037,8 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) { return Status::OK(); } -Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) { +Status PartitionedHashJoinNode::CleanUpHashPartitions( + RuntimeState* state, RowBatch* batch) { DCHECK_EQ(probe_batch_pos_, -1); // At this point all the rows have been read from the probe side for all partitions in // hash_partitions_. @@ -1090,7 +1097,7 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) { // For NAAJ, we need to try to match all the NULL probe rows with this partition // before closing it. The NULL probe rows could have come from any partition // so we collect them all and match them at the end. - RETURN_IF_ERROR(EvaluateNullProbe(build_partition->build_rows())); + RETURN_IF_ERROR(EvaluateNullProbe(state, build_partition->build_rows())); build_partition->Close(batch); } else { build_partition->Close(batch); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7ccbfe47/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index bf90ae4..41493d0 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -331,7 +331,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// rows in build. This updates matched_null_probe_, short-circuiting if one of the /// conjuncts pass (i.e. there is a match). /// This is used for NAAJ, when there are NULL probe rows. - Status EvaluateNullProbe(BufferedTupleStream* build) WARN_UNUSED_RESULT; + Status EvaluateNullProbe( + RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT; /// Prepares to output NULLs on the probe side for NAAJ. Before calling this, /// matched_null_probe_ should have been fully evaluated. @@ -351,7 +352,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// unmatched rows. /// - If the build partition did not have a hash table, meaning both build and probe /// rows were spilled, move the partition to 'spilled_partitions_'. - Status CleanUpHashPartitions(RowBatch* batch) WARN_UNUSED_RESULT; + Status CleanUpHashPartitions(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT; /// Get the next row batch from the probe (left) side (child(0)). If we are done /// consuming the input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.
