rtpsw commented on code in PR #34392:
URL: https://github.com/apache/arrow/pull/34392#discussion_r1211671161
##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -239,24 +344,48 @@ struct MemoStore {
// the time of the current entry, defaulting to 0.
// when entries with a time less than T are removed, the current time is
updated to the
// time of the next (by-time) and now-current entry or to T if no such entry
exists.
- OnType current_time_;
+ std::atomic<OnType> current_time_;
Review Comment:
This is related to your [other question
here](https://github.com/apache/arrow/pull/34392#discussion_r1211622318).
There, we need to set the current time on the `MemoStore` instance given a new
batch, and this is done from a different thread (the one handling an incoming
input batch) than the one processing the batch (the one running
`ProcessThread`) using the same `MemoStore` instance. This means we need to
synchronize `MemoStore.current_time_` between these threads, and so it is made
atomic here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]