rtpsw commented on code in PR #34392:
URL: https://github.com/apache/arrow/pull/34392#discussion_r1211671161


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -239,24 +344,48 @@ struct MemoStore {
   // the time of the current entry, defaulting to 0.
   // when entries with a time less than T are removed, the current time is 
updated to the
   // time of the next (by-time) and now-current entry or to T if no such entry 
exists.
-  OnType current_time_;
+  std::atomic<OnType> current_time_;

Review Comment:
   This is related to your [other question 
here](https://github.com/apache/arrow/pull/34392#discussion_r1211622318). 
There, we need to set the current time on the `MemoStore` instance given a new 
batch, and this is done from a different thread (the one handling an incoming 
input batch) than the one processing the batch (the one running 
`ProcessThread`) using the same `MemoStore` instance. This means we need to 
synchronize `MemoStore.current_time_` between these threads, and so it is made 
atomic here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to