westonpace commented on code in PR #12289:
URL: https://github.com/apache/arrow/pull/12289#discussion_r868721601
##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -809,22 +1009,40 @@ class HashJoinBasicImpl : public HashJoinImpl {
ScanHashTable_num_tasks());
}
- bool QueueBatchIfNeeded(int side, ExecBatch batch) {
+ Result<bool> QueueBatchIfNeeded(size_t thread_index, int side, ExecBatch&
batch) {
if (side == 0) {
- std::lock_guard<std::mutex> lock(left_batches_mutex_);
- if (has_hash_table_) {
- return false;
+ // We don't want to do the filtering while holding the lock, since that
can get
+ // expensive.
+ bool needs_filtering;
+ {
+ std::lock_guard<std::mutex> lock(left_batches_mutex_);
+ bloom_filters_ready_ = bloom_filters_ready_ ||
num_expected_bloom_filters_ == 0;
+ needs_filtering = bloom_filters_ready_ && num_expected_bloom_filters_
!= 0;
}
- left_batches_.emplace_back(std::move(batch));
- return true;
+ if (needs_filtering)
RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));
Review Comment:
Ah, you're right. I had thought the logic below was `queued =
!bloom_filters_ready_ && !has_hash_table_;`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]