This is an automated email from the ASF dual-hosted git repository. raulcd pushed a commit to branch maint-13.0.0 in repository https://gitbox.apache.org/repos/asf/arrow.git
commit f5a4f12ffb9b77a4a89138a1fb81d63cc7a32884 Author: rtpsw <[email protected]> AuthorDate: Wed Jul 12 22:58:30 2023 +0300 GH-36482: [C++][CI] Fix sporadic test failures in AsofJoinBasicTest (#36499) ### What changes are included in this PR? The key hasher is invalidated before the first invocation of `GetKey` (via `GetLatestKey`) after a new batch arrives. In the pre-PR code, this invalidation happens within `Advance`, which is called from `AdvanceAndMemoize` only after `GetLatestKey` is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a ne [...] ### Are these changes tested? Yes, by existing tests. ### Are there any user-facing changes? No. **This PR contains a "Critical Fix".** * Closes: #36482 Authored-by: Yaron Gvili <[email protected]> Signed-off-by: Weston Pace <[email protected]> --- cpp/src/arrow/acero/asof_join_node.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 98e5918ebb..b7f5d878e5 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -524,7 +524,7 @@ class KeyHasher { size_t index_; std::vector<col_index_t> indices_; std::vector<KeyColumnMetadata> metadata_; - const RecordBatch* batch_; + std::atomic<const RecordBatch*> batch_; std::vector<HashType> hashes_; LightContext ctx_; std::vector<KeyColumnArray> column_arrays_; @@ -819,7 +819,6 @@ class InputState { have_active_batch &= !queue_.TryPop(); if (have_active_batch) { DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0); // empty batches disallowed - key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0)); // time changed } } @@ -897,7 +896,8 @@ class InputState { Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) { if (rb->num_rows() > 0) { - queue_.Push(rb); // only after above updates - push batch for processing + key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache + queue_.Push(rb); // only now push batch for processing } else { ++batches_processed_; // don't enqueue empty batches, just record as processed }
